diff options
author | Gilles Chehade <gilles@cvs.openbsd.org> | 2010-10-09 22:12:27 +0000 |
---|---|---|
committer | Gilles Chehade <gilles@cvs.openbsd.org> | 2010-10-09 22:12:27 +0000 |
commit | faec688b7be6c2adbda58666a70aa4656c3d48e6 (patch) | |
tree | 4aa29c39e8ebba9d1f7698cbed12e3b8afa7e642 /usr.sbin/smtpd | |
parent | 487e7fe9e1b5ca3dbee9602d4fd487bfd6b3a553 (diff) |
these need to be re-added too
Diffstat (limited to 'usr.sbin/smtpd')
-rw-r--r-- | usr.sbin/smtpd/queue_shared.c | 817 | ||||
-rw-r--r-- | usr.sbin/smtpd/runner.c | 889 |
2 files changed, 1706 insertions, 0 deletions
diff --git a/usr.sbin/smtpd/queue_shared.c b/usr.sbin/smtpd/queue_shared.c new file mode 100644 index 00000000000..6f2f6dae9c4 --- /dev/null +++ b/usr.sbin/smtpd/queue_shared.c @@ -0,0 +1,817 @@ +/* $OpenBSD: queue_shared.c,v 1.31 2010/10/09 22:12:26 gilles Exp $ */ + +/* + * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> + * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> + * Copyright (c) 2008-2009 Jacek Masiulaniec <jacekm@dobremiasto.net> + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include <sys/types.h> +#include <sys/queue.h> +#include <sys/tree.h> +#include <sys/param.h> +#include <sys/socket.h> +#include <sys/stat.h> + +#include <dirent.h> +#include <err.h> +#include <errno.h> +#include <event.h> +#include <fcntl.h> +#include <pwd.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <time.h> +#include <unistd.h> + +#include "smtpd.h" + +#define QWALK_AGAIN 0x1 +#define QWALK_RECURSE 0x2 +#define QWALK_RETURN 0x3 + +struct qwalk { + char path[MAXPATHLEN]; + DIR *dirs[3]; + int (*filefn)(struct qwalk *, char *); + int bucket; + int level; + int strict; +}; + +int walk_simple(struct qwalk *, char *); +int walk_queue(struct qwalk *, char *); + +void display_envelope(struct message *, int); +void getflag(u_int *, int, char *, char *, size_t); + +int +queue_create_layout_message(char *queuepath, char *message_id) +{ + char rootdir[MAXPATHLEN]; + char evpdir[MAXPATHLEN]; + + if (! bsnprintf(rootdir, sizeof(rootdir), "%s/%d.XXXXXXXXXXXXXXXX", + queuepath, time(NULL))) + fatalx("queue_create_layout_message: snprintf"); + + if (mkdtemp(rootdir) == NULL) { + if (errno == ENOSPC) { + log_debug("FAILED WITH ENOSPC"); + bzero(message_id, MAX_ID_SIZE); + return 0; + } + fatal("queue_create_layout_message: mkdtemp"); + } + + if (strlcpy(message_id, rootdir + strlen(queuepath) + 1, MAX_ID_SIZE) + >= MAX_ID_SIZE) + fatalx("queue_create_layout_message: truncation"); + + if (! bsnprintf(evpdir, sizeof(evpdir), "%s%s", rootdir, + PATH_ENVELOPES)) + fatalx("queue_create_layout_message: snprintf"); + + if (mkdir(evpdir, 0700) == -1) { + if (errno == ENOSPC) { + log_debug("FAILED WITH ENOSPC"); + rmdir(rootdir); + bzero(message_id, MAX_ID_SIZE); + return 0; + } + fatal("queue_create_layout_message: mkdir"); + } + return 1; +} + +void +queue_delete_layout_message(char *queuepath, char *msgid) +{ + char rootdir[MAXPATHLEN]; + char purgedir[MAXPATHLEN]; + + if (! bsnprintf(rootdir, sizeof(rootdir), "%s/%s", queuepath, msgid)) + fatalx("snprintf"); + + if (! bsnprintf(purgedir, sizeof(purgedir), "%s/%s", PATH_PURGE, msgid)) + fatalx("snprintf"); + + if (rename(rootdir, purgedir) == -1) { + log_debug("ID: %s", msgid); + log_debug("PATH: %s", rootdir); + log_debug("PURGE: %s", purgedir); + fatal("queue_delete_layout_message: rename"); + } +} + +int +queue_record_layout_envelope(char *queuepath, struct message *message) +{ + char evpname[MAXPATHLEN]; + FILE *fp; + int fd; + +again: + if (! bsnprintf(evpname, sizeof(evpname), "%s/%s%s/%s.%qu", queuepath, + message->message_id, PATH_ENVELOPES, message->message_id, + (u_int64_t)arc4random())) + fatalx("queue_record_incoming_envelope: snprintf"); + + fd = open(evpname, O_WRONLY|O_CREAT|O_EXCL, 0600); + if (fd == -1) { + if (errno == EEXIST) + goto again; + if (errno == ENOSPC || errno == ENFILE) + goto tempfail; + fatal("queue_record_incoming_envelope: open"); + } + + fp = fdopen(fd, "w"); + if (fp == NULL) + fatal("queue_record_incoming_envelope: fdopen"); + + message->creation = time(NULL); + if (strlcpy(message->message_uid, strrchr(evpname, '/') + 1, + sizeof(message->message_uid)) >= sizeof(message->message_uid)) + fatalx("queue_record_incoming_envelope: truncation"); + + if (fwrite(message, sizeof (struct message), 1, fp) != 1) { + if (errno == ENOSPC) + goto tempfail; + fatal("queue_record_incoming_envelope: write"); + } + + if (! safe_fclose(fp)) + goto tempfail; + + return 1; + +tempfail: + unlink(evpname); + close(fd); + message->creation = 0; + message->message_uid[0] = '\0'; + + return 0; +} + +int +queue_remove_layout_envelope(char *queuepath, struct message *message) +{ + char pathname[MAXPATHLEN]; + + if (! bsnprintf(pathname, sizeof(pathname), "%s/%s%s/%s", queuepath, + message->message_id, PATH_ENVELOPES, message->message_uid)) + fatal("queue_remove_incoming_envelope: snprintf"); + + if (unlink(pathname) == -1) + fatal("queue_remove_incoming_envelope: unlink"); + + return 1; +} + +int +queue_commit_layout_message(char *queuepath, struct message *messagep) +{ + char rootdir[MAXPATHLEN]; + char queuedir[MAXPATHLEN]; + + if (! bsnprintf(rootdir, sizeof(rootdir), "%s/%s", queuepath, + messagep->message_id)) + fatal("queue_commit_message_incoming: snprintf"); + + if (! bsnprintf(queuedir, sizeof(queuedir), "%s/%d", PATH_QUEUE, + queue_hash(messagep->message_id))) + fatal("queue_commit_message_incoming: snprintf"); + + if (mkdir(queuedir, 0700) == -1) { + if (errno == ENOSPC) + return 0; + if (errno != EEXIST) + fatal("queue_commit_message_incoming: mkdir"); + } + + if (strlcat(queuedir, "/", sizeof(queuedir)) >= sizeof(queuedir) || + strlcat(queuedir, messagep->message_id, sizeof(queuedir)) >= + sizeof(queuedir)) + fatalx("queue_commit_incoming_message: truncation"); + + if (rename(rootdir, queuedir) == -1) { + if (errno == ENOSPC) + return 0; + fatal("queue_commit_message_incoming: rename"); + } + + return 1; +} + +int +queue_open_layout_messagefile(char *queuepath, struct message *messagep) +{ + char pathname[MAXPATHLEN]; + + if (! bsnprintf(pathname, sizeof(pathname), "%s/%s/message", queuepath, + messagep->message_id)) + fatal("queue_open_incoming_message_file: snprintf"); + + return open(pathname, O_CREAT|O_EXCL|O_RDWR, 0600); +} + +int +enqueue_create_layout(char *msgid) +{ + return queue_create_layout_message(PATH_ENQUEUE, msgid); +} + +void +enqueue_delete_message(char *msgid) +{ + queue_delete_layout_message(PATH_ENQUEUE, msgid); +} + +int +enqueue_record_envelope(struct message *message) +{ + return queue_record_layout_envelope(PATH_ENQUEUE, message); +} + +int +enqueue_remove_envelope(struct message *message) +{ + return queue_remove_layout_envelope(PATH_ENQUEUE, message); +} + +int +enqueue_commit_message(struct message *message) +{ + return queue_commit_layout_message(PATH_ENQUEUE, message); +} + +int +enqueue_open_messagefile(struct message *message) +{ + return queue_open_layout_messagefile(PATH_ENQUEUE, message); +} + +int +bounce_create_layout(char *msgid, struct message *message) +{ + char msgpath[MAXPATHLEN]; + char lnkpath[MAXPATHLEN]; + + if (! queue_create_layout_message(PATH_BOUNCE, msgid)) + return 0; + + if (! bsnprintf(msgpath, sizeof(msgpath), "%s/%d/%s/message", + PATH_QUEUE, queue_hash(message->message_id), + message->message_id)) + return 0; + + if (! bsnprintf(lnkpath, sizeof(lnkpath), "%s/%s/message", + PATH_BOUNCE, msgid)) + return 0; + + if (link(msgpath, lnkpath) == -1) + fatal("link"); + + return 1; +} + +void +bounce_delete_message(char *msgid) +{ + queue_delete_layout_message(PATH_BOUNCE, msgid); +} + +int +bounce_record_envelope(struct message *message) +{ + message->lasttry = 0; + message->retry = 0; + return queue_record_layout_envelope(PATH_BOUNCE, message); +} + +int +bounce_remove_envelope(struct message *message) +{ + return queue_remove_layout_envelope(PATH_BOUNCE, message); +} + +int +bounce_commit_message(struct message *message) +{ + return queue_commit_layout_message(PATH_BOUNCE, message); +} + +int +bounce_record_message(struct message *messagep) +{ + char msgid[MAX_ID_SIZE]; + struct message mbounce; + + if (messagep->type == T_BOUNCE_MESSAGE) { + log_debug("mailer daemons loop detected !"); + return 0; + } + + mbounce = *messagep; + mbounce.type = T_BOUNCE_MESSAGE; + mbounce.status &= ~S_MESSAGE_PERMFAILURE; + + if (! bounce_create_layout(msgid, messagep)) + return 0; + + strlcpy(mbounce.message_id, msgid, sizeof(mbounce.message_id)); + if (! bounce_record_envelope(&mbounce)) + return 0; + + return bounce_commit_message(&mbounce); +} + +int +queue_create_incoming_layout(char *msgid) +{ + return queue_create_layout_message(PATH_INCOMING, msgid); +} + +void +queue_delete_incoming_message(char *msgid) +{ + queue_delete_layout_message(PATH_INCOMING, msgid); +} + +int +queue_record_incoming_envelope(struct message *message) +{ + return queue_record_layout_envelope(PATH_INCOMING, message); +} + +int +queue_remove_incoming_envelope(struct message *message) +{ + return queue_remove_layout_envelope(PATH_INCOMING, message); +} + +int +queue_commit_incoming_message(struct message *message) +{ + return queue_commit_layout_message(PATH_INCOMING, message); +} + +int +queue_open_incoming_message_file(struct message *message) +{ + return queue_open_layout_messagefile(PATH_INCOMING, message); +} + +int +queue_open_message_file(char *msgid) +{ + int fd; + char pathname[MAXPATHLEN]; + u_int16_t hval; + + hval = queue_hash(msgid); + + if (! bsnprintf(pathname, sizeof(pathname), "%s/%d/%s/message", + PATH_QUEUE, hval, msgid)) + fatal("queue_open_message_file: snprintf"); + + if ((fd = open(pathname, O_RDONLY)) == -1) + fatal("queue_open_message_file: open"); + + return fd; +} + +void +queue_delete_message(char *msgid) +{ + char rootdir[MAXPATHLEN]; + char evpdir[MAXPATHLEN]; + char msgpath[MAXPATHLEN]; + u_int16_t hval; + + hval = queue_hash(msgid); + if (! bsnprintf(rootdir, sizeof(rootdir), "%s/%d/%s", PATH_QUEUE, + hval, msgid)) + fatal("queue_delete_message: snprintf"); + + if (! bsnprintf(evpdir, sizeof(evpdir), "%s%s", rootdir, + PATH_ENVELOPES)) + fatal("queue_delete_message: snprintf"); + + if (! bsnprintf(msgpath, sizeof(msgpath), "%s/message", rootdir)) + fatal("queue_delete_message: snprintf"); + + if (unlink(msgpath) == -1) + fatal("queue_delete_message: unlink"); + + if (rmdir(evpdir) == -1) { + /* It is ok to fail rmdir with ENOENT here + * because upon successful delivery of the + * last envelope, we remove the directory. + */ + if (errno != ENOENT) + fatal("queue_delete_message: rmdir"); + } + + if (rmdir(rootdir) == -1) + fatal("#2 queue_delete_message: rmdir"); + + if (! bsnprintf(rootdir, sizeof(rootdir), "%s/%d", PATH_QUEUE, hval)) + fatal("queue_delete_message: snprintf"); + + rmdir(rootdir); + + return; +} + +void +queue_message_update(struct message *messagep) +{ + messagep->flags &= ~F_MESSAGE_PROCESSING; + messagep->status &= ~(S_MESSAGE_ACCEPTED|S_MESSAGE_REJECTED); + messagep->batch_id = 0; + messagep->retry++; + + if (messagep->status & S_MESSAGE_PERMFAILURE) { + if (messagep->type != T_BOUNCE_MESSAGE && + messagep->sender.user[0] != '\0') + bounce_record_message(messagep); + queue_remove_envelope(messagep); + return; + } + + if (messagep->status & S_MESSAGE_TEMPFAILURE) { + messagep->status &= ~S_MESSAGE_TEMPFAILURE; + queue_update_envelope(messagep); + return; + } + + /* no error, remove envelope */ + queue_remove_envelope(messagep); +} + +int +queue_remove_envelope(struct message *messagep) +{ + char pathname[MAXPATHLEN]; + u_int16_t hval; + + hval = queue_hash(messagep->message_id); + + if (! bsnprintf(pathname, sizeof(pathname), "%s/%d/%s%s/%s", + PATH_QUEUE, hval, messagep->message_id, PATH_ENVELOPES, + messagep->message_uid)) + fatal("queue_remove_envelope: snprintf"); + + if (unlink(pathname) == -1) + fatal("queue_remove_envelope: unlink"); + + if (! bsnprintf(pathname, sizeof(pathname), "%s/%d/%s%s", PATH_QUEUE, + hval, messagep->message_id, PATH_ENVELOPES)) + fatal("queue_remove_envelope: snprintf"); + + if (rmdir(pathname) != -1) + queue_delete_message(messagep->message_id); + + return 1; +} + +int +queue_update_envelope(struct message *messagep) +{ + char temp[MAXPATHLEN]; + char dest[MAXPATHLEN]; + FILE *fp; + u_int64_t batch_id; + + batch_id = messagep->batch_id; + messagep->batch_id = 0; + + if (! bsnprintf(temp, sizeof(temp), "%s/envelope.tmp", PATH_QUEUE)) + fatalx("queue_update_envelope"); + + if (! bsnprintf(dest, sizeof(dest), "%s/%d/%s%s/%s", PATH_QUEUE, + queue_hash(messagep->message_id), messagep->message_id, + PATH_ENVELOPES, messagep->message_uid)) + fatal("queue_update_envelope: snprintf"); + + fp = fopen(temp, "w"); + if (fp == NULL) { + if (errno == ENOSPC || errno == ENFILE) + goto tempfail; + fatal("queue_update_envelope: open"); + } + if (fwrite(messagep, sizeof(struct message), 1, fp) != 1) { + if (errno == ENOSPC) + goto tempfail; + fatal("queue_update_envelope: fwrite"); + } + if (! safe_fclose(fp)) + goto tempfail; + + if (rename(temp, dest) == -1) { + if (errno == ENOSPC) + goto tempfail; + fatal("queue_update_envelope: rename"); + } + + messagep->batch_id = batch_id; + return 1; + +tempfail: + if (unlink(temp) == -1) + fatal("queue_update_envelope: unlink"); + if (fp) + fclose(fp); + + messagep->batch_id = batch_id; + return 0; +} + +int +queue_load_envelope(struct message *messagep, char *evpid) +{ + char pathname[MAXPATHLEN]; + char msgid[MAX_ID_SIZE]; + FILE *fp; + + if (strlcpy(msgid, evpid, sizeof(msgid)) >= sizeof(msgid)) + fatalx("queue_load_envelope: truncation"); + *strrchr(msgid, '.') = '\0'; + + if (! bsnprintf(pathname, sizeof(pathname), "%s/%d/%s%s/%s", PATH_QUEUE, + queue_hash(msgid), msgid, PATH_ENVELOPES, evpid)) + fatalx("queue_load_envelope: snprintf"); + + fp = fopen(pathname, "r"); + if (fp == NULL) { + if (errno == ENOSPC || errno == ENFILE) + return 0; + fatal("queue_load_envelope: fopen"); + } + if (fread(messagep, sizeof(struct message), 1, fp) != 1) + fatal("queue_load_envelope: fread"); + fclose(fp); + + return 1; +} + +u_int16_t +queue_hash(char *msgid) +{ + u_int16_t h; + + for (h = 5381; *msgid; msgid++) + h = ((h << 5) + h) + *msgid; + + return (h % DIRHASH_BUCKETS); +} + +struct qwalk * +qwalk_new(char *path) +{ + struct qwalk *q; + + q = calloc(1, sizeof(struct qwalk)); + if (q == NULL) + fatal("qwalk_new: calloc"); + + strlcpy(q->path, path, sizeof(q->path)); + + q->level = 0; + q->strict = 0; + q->filefn = walk_simple; + + if (smtpd_process == PROC_QUEUE || smtpd_process == PROC_RUNNER) + q->strict = 1; + + if (strcmp(path, PATH_QUEUE) == 0) + q->filefn = walk_queue; + + q->dirs[0] = opendir(q->path); + if (q->dirs[0] == NULL) + fatal("qwalk_new: opendir"); + + return (q); +} + +int +qwalk(struct qwalk *q, char *filepath) +{ + struct dirent *dp; + +again: + errno = 0; + dp = readdir(q->dirs[q->level]); + if (errno) + fatal("qwalk: readdir"); + if (dp == NULL) { + closedir(q->dirs[q->level]); + q->dirs[q->level] = NULL; + if (q->level == 0) + return (0); + q->level--; + goto again; + } + + if (strcmp(dp->d_name, ".") == 0 || strcmp(dp->d_name, "..") == 0) + goto again; + + switch (q->filefn(q, dp->d_name)) { + case QWALK_AGAIN: + goto again; + case QWALK_RECURSE: + goto recurse; + case QWALK_RETURN: + if (! bsnprintf(filepath, MAXPATHLEN, "%s/%s", q->path, + dp->d_name)) + fatalx("qwalk: snprintf"); + return (1); + default: + fatalx("qwalk: callback failed"); + } + +recurse: + q->level++; + q->dirs[q->level] = opendir(q->path); + if (q->dirs[q->level] == NULL) { + if (errno == ENOENT && !q->strict) { + q->level--; + goto again; + } + fatal("qwalk: opendir"); + } + goto again; +} + +void +qwalk_close(struct qwalk *q) +{ + int i; + + for (i = 0; i <= q->level; i++) + if (q->dirs[i]) + closedir(q->dirs[i]); + + bzero(q, sizeof(struct qwalk)); + free(q); +} + +int +walk_simple(struct qwalk *q, char *fname) +{ + return (QWALK_RETURN); +} + +int +walk_queue(struct qwalk *q, char *fname) +{ + const char *errstr; + + switch (q->level) { + case 0: + if (strcmp(fname, "envelope.tmp") == 0) + return (QWALK_AGAIN); + q->bucket = strtonum(fname, 0, DIRHASH_BUCKETS - 1, &errstr); + if (errstr) { + log_warnx("walk_queue: invalid bucket: %s", fname); + return (QWALK_AGAIN); + } + if (! bsnprintf(q->path, sizeof(q->path), "%s/%d", PATH_QUEUE, + q->bucket)) + fatalx("walk_queue: snprintf"); + return (QWALK_RECURSE); + case 1: + if (! bsnprintf(q->path, sizeof(q->path), "%s/%d/%s%s", + PATH_QUEUE, q->bucket, fname, PATH_ENVELOPES)) + fatalx("walk_queue: snprintf"); + return (QWALK_RECURSE); + case 2: + return (QWALK_RETURN); + } + + return (-1); +} + +void +show_queue(char *queuepath, int flags) +{ + char path[MAXPATHLEN]; + struct message message; + struct qwalk *q; + FILE *fp; + + log_init(1); + + if (chroot(PATH_SPOOL) == -1 || chdir(".") == -1) + err(1, "%s", PATH_SPOOL); + + q = qwalk_new(queuepath); + + while (qwalk(q, path)) { + fp = fopen(path, "r"); + if (fp == NULL) { + if (errno == ENOENT) + continue; + err(1, "%s", path); + } + + errno = 0; + if (fread(&message, sizeof(struct message), 1, fp) != 1) + err(1, "%s", path); + fclose(fp); + + display_envelope(&message, flags); + } + + qwalk_close(q); +} + +void +display_envelope(struct message *envelope, int flags) +{ + char status[128]; + + status[0] = '\0'; + + getflag(&envelope->status, S_MESSAGE_TEMPFAILURE, "TEMPFAIL", + status, sizeof(status)); + + if (envelope->status) + errx(1, "%s: unexpected status 0x%04x", envelope->message_uid, + envelope->status); + + getflag(&envelope->flags, F_MESSAGE_BOUNCE, "BOUNCE", + status, sizeof(status)); + getflag(&envelope->flags, F_MESSAGE_AUTHENTICATED, "AUTH", + status, sizeof(status)); + getflag(&envelope->flags, F_MESSAGE_PROCESSING, "PROCESSING", + status, sizeof(status)); + getflag(&envelope->flags, F_MESSAGE_SCHEDULED, "SCHEDULED", + status, sizeof(status)); + getflag(&envelope->flags, F_MESSAGE_ENQUEUED, "ENQUEUED", + status, sizeof(status)); + getflag(&envelope->flags, F_MESSAGE_FORCESCHEDULE, "SCHEDULED_MANUAL", + status, sizeof(status)); + + if (envelope->flags) + errx(1, "%s: unexpected flags 0x%04x", envelope->message_uid, + envelope->flags); + + if (status[0]) + status[strlen(status) - 1] = '\0'; + else + strlcpy(status, "-", sizeof(status)); + + switch (envelope->type) { + case T_MDA_MESSAGE: + printf("MDA"); + break; + case T_MTA_MESSAGE: + printf("MTA"); + break; + case T_BOUNCE_MESSAGE: + printf("BOUNCE"); + break; + default: + printf("UNKNOWN"); + } + + printf("|%s|%s|%s@%s|%s@%s|%d|%u", + envelope->message_uid, + status, + envelope->sender.user, envelope->sender.domain, + envelope->recipient.user, envelope->recipient.domain, + envelope->lasttry, + envelope->retry); + + if (envelope->session_errorline[0] != '\0') + printf("|%s", envelope->session_errorline); + + printf("\n"); +} + +void +getflag(u_int *bitmap, int bit, char *bitstr, char *buf, size_t len) +{ + if (*bitmap & bit) { + *bitmap &= ~bit; + strlcat(buf, bitstr, len); + strlcat(buf, ",", len); + } +} diff --git a/usr.sbin/smtpd/runner.c b/usr.sbin/smtpd/runner.c new file mode 100644 index 00000000000..d66a6c45a05 --- /dev/null +++ b/usr.sbin/smtpd/runner.c @@ -0,0 +1,889 @@ +/* $OpenBSD: runner.c,v 1.91 2010/10/09 22:12:26 gilles Exp $ */ + +/* + * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> + * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> + * Copyright (c) 2008-2009 Jacek Masiulaniec <jacekm@dobremiasto.net> + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include <sys/types.h> +#include <sys/queue.h> +#include <sys/tree.h> +#include <sys/param.h> +#include <sys/socket.h> +#include <sys/stat.h> + +#include <netinet/in.h> +#include <arpa/inet.h> + +#include <ctype.h> +#include <dirent.h> +#include <err.h> +#include <errno.h> +#include <event.h> +#include <fcntl.h> +#include <libgen.h> +#include <netdb.h> +#include <pwd.h> +#include <signal.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <time.h> +#include <unistd.h> + +#include "smtpd.h" + +void runner_imsg(struct smtpd *, struct imsgev *, struct imsg *); + +__dead void runner_shutdown(void); +void runner_sig_handler(int, short, void *); +void runner_setup_events(struct smtpd *); +void runner_disable_events(struct smtpd *); + +void runner_reset_flags(void); +void runner_process_offline(struct smtpd *); + +void runner_timeout(int, short, void *); + +void runner_process_queue(struct smtpd *); +void runner_process_runqueue(struct smtpd *); +void runner_process_batchqueue(struct smtpd *); + +int runner_message_schedule(struct message *, time_t); + +void runner_purge_run(void); +void runner_purge_message(char *); + +int runner_check_loop(struct message *); + +struct batch *batch_record(struct smtpd *, struct message *); +struct batch *batch_lookup(struct smtpd *, struct message *); + +int runner_force_envelope_schedule(char *); +int runner_force_message_schedule(char *); + +int runner_force_envelope_remove(char *); +int runner_force_message_remove(char *); + +void +runner_imsg(struct smtpd *env, struct imsgev *iev, struct imsg *imsg) +{ + struct message *m; + struct remove *rem; + struct sched *sched; + + switch (imsg->hdr.type) { + case IMSG_QUEUE_MESSAGE_UPDATE: + env->stats->runner.active--; + queue_message_update(imsg->data); + return; + + case IMSG_MDA_SESS_NEW: + env->stats->mda.sessions_active--; + return; + + case IMSG_BATCH_DONE: + env->stats->mta.sessions_active--; + return; + + case IMSG_QUEUE_SCHEDULE: + sched = imsg->data; + sched->ret = 0; + if (valid_message_uid(sched->mid)) + sched->ret = runner_force_envelope_schedule(sched->mid); + else if (valid_message_id(sched->mid)) + sched->ret = runner_force_message_schedule(sched->mid); + imsg_compose_event(iev, IMSG_QUEUE_SCHEDULE, imsg->hdr.peerid, + 0, -1, sched, sizeof *sched); + return; + + case IMSG_QUEUE_REMOVE: + rem = imsg->data; + rem->ret = 0; + if (valid_message_uid(rem->mid)) + rem->ret = runner_force_envelope_remove(rem->mid); + else if (valid_message_id(rem->mid)) + rem->ret = runner_force_message_remove(rem->mid); + imsg_compose_event(iev, IMSG_QUEUE_REMOVE, imsg->hdr.peerid, 0, + -1, rem, sizeof *rem); + return; + + case IMSG_PARENT_ENQUEUE_OFFLINE: + runner_process_offline(env); + return; + + case IMSG_SMTP_ENQUEUE: + m = imsg->data; + if (imsg->fd < 0 || !bounce_session(env, imsg->fd, m)) { + m->status = S_MESSAGE_TEMPFAILURE; + queue_message_update(m); + } + return; + + case IMSG_QUEUE_PAUSE_LOCAL: + env->sc_opts |= SMTPD_MDA_PAUSED; + return; + + case IMSG_QUEUE_RESUME_LOCAL: + env->sc_opts &= ~SMTPD_MDA_PAUSED; + return; + + case IMSG_QUEUE_PAUSE_OUTGOING: + env->sc_opts |= SMTPD_MTA_PAUSED; + return; + + case IMSG_QUEUE_RESUME_OUTGOING: + env->sc_opts &= ~SMTPD_MTA_PAUSED; + return; + + case IMSG_CTL_VERBOSE: + log_verbose(*(int *)imsg->data); + return; + } + + fatalx("runner_imsg: unexpected imsg"); +} + +void +runner_sig_handler(int sig, short event, void *p) +{ + switch (sig) { + case SIGINT: + case SIGTERM: + runner_shutdown(); + break; + default: + fatalx("runner_sig_handler: unexpected signal"); + } +} + +void +runner_shutdown(void) +{ + log_info("runner handler exiting"); + _exit(0); +} + +void +runner_setup_events(struct smtpd *env) +{ + struct timeval tv; + + evtimer_set(&env->sc_ev, runner_timeout, env); + tv.tv_sec = 0; + tv.tv_usec = 10; + evtimer_add(&env->sc_ev, &tv); +} + +void +runner_disable_events(struct smtpd *env) +{ + evtimer_del(&env->sc_ev); +} + +pid_t +runner(struct smtpd *env) +{ + pid_t pid; + struct passwd *pw; + + struct event ev_sigint; + struct event ev_sigterm; + + struct peer peers[] = { + { PROC_QUEUE, imsg_dispatch } + }; + + switch (pid = fork()) { + case -1: + fatal("runner: cannot fork"); + case 0: + break; + default: + return (pid); + } + + purge_config(env, PURGE_EVERYTHING); + + pw = env->sc_pw; + + if (chroot(PATH_SPOOL) == -1) + fatal("runner: chroot"); + if (chdir("/") == -1) + fatal("runner: chdir(\"/\")"); + + smtpd_process = PROC_RUNNER; + setproctitle("%s", env->sc_title[smtpd_process]); + + if (setgroups(1, &pw->pw_gid) || + setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) || + setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid)) + fatal("runner: cannot drop privileges"); + + SPLAY_INIT(&env->batch_queue); + + imsg_callback = runner_imsg; + event_init(); + + signal_set(&ev_sigint, SIGINT, runner_sig_handler, env); + signal_set(&ev_sigterm, SIGTERM, runner_sig_handler, env); + signal_add(&ev_sigint, NULL); + signal_add(&ev_sigterm, NULL); + signal(SIGPIPE, SIG_IGN); + signal(SIGHUP, SIG_IGN); + + /* see fdlimit()-related comment in queue.c */ + fdlimit(1.0); + if ((env->sc_maxconn = availdesc() / 4) < 1) + fatalx("runner: fd starvation"); + + config_pipes(env, peers, nitems(peers)); + config_peers(env, peers, nitems(peers)); + + unlink(PATH_QUEUE "/envelope.tmp"); + runner_reset_flags(); + runner_process_offline(env); + + runner_setup_events(env); + event_dispatch(); + runner_shutdown(); + + return (0); +} + +void +runner_process_offline(struct smtpd *env) +{ + char path[MAXPATHLEN]; + struct qwalk *q; + + q = qwalk_new(PATH_OFFLINE); + + if (qwalk(q, path)) + imsg_compose_event(env->sc_ievs[PROC_QUEUE], + IMSG_PARENT_ENQUEUE_OFFLINE, PROC_PARENT, 0, -1, path, + strlen(path) + 1); + + qwalk_close(q); +} + +void +runner_reset_flags(void) +{ + char path[MAXPATHLEN]; + struct message message; + struct qwalk *q; + + q = qwalk_new(PATH_QUEUE); + + while (qwalk(q, path)) { + while (! queue_load_envelope(&message, basename(path))) + sleep(1); + message_reset_flags(&message); + } + + qwalk_close(q); +} + +void +runner_timeout(int fd, short event, void *p) +{ + struct smtpd *env = p; + struct timeval tv; + + runner_purge_run(); + + runner_process_queue(env); + runner_process_runqueue(env); + runner_process_batchqueue(env); + + tv.tv_sec = 1; + tv.tv_usec = 0; + evtimer_add(&env->sc_ev, &tv); +} + +void +runner_process_queue(struct smtpd *env) +{ + char path[MAXPATHLEN]; + char rqpath[MAXPATHLEN]; + struct message message; + time_t now; + size_t mta_av, mda_av, bnc_av; + struct qwalk *q; + + mta_av = env->sc_maxconn - env->stats->mta.sessions_active; + mda_av = env->sc_maxconn - env->stats->mda.sessions_active; + bnc_av = env->sc_maxconn - env->stats->runner.bounces_active; + + now = time(NULL); + q = qwalk_new(PATH_QUEUE); + + while (qwalk(q, path)) { + if (! queue_load_envelope(&message, basename(path))) + continue; + + if (message.type & T_MDA_MESSAGE) { + if (env->sc_opts & SMTPD_MDA_PAUSED) + continue; + if (mda_av == 0) + continue; + } + + if (message.type & T_MTA_MESSAGE) { + if (env->sc_opts & SMTPD_MTA_PAUSED) + continue; + if (mta_av == 0) + continue; + } + + if (message.type & T_BOUNCE_MESSAGE) { + if (env->sc_opts & (SMTPD_MDA_PAUSED|SMTPD_MTA_PAUSED)) + continue; + if (bnc_av == 0) + continue; + } + + if (! runner_message_schedule(&message, now)) + continue; + + if (runner_check_loop(&message)) { + message_set_errormsg(&message, "loop has been detected"); + bounce_record_message(&message); + queue_remove_envelope(&message); + continue; + } + + message.flags |= F_MESSAGE_SCHEDULED; + message.flags &= ~F_MESSAGE_FORCESCHEDULE; + queue_update_envelope(&message); + + if (! bsnprintf(rqpath, sizeof(rqpath), "%s/%s", PATH_RUNQUEUE, + basename(path))) + fatalx("runner_process_queue: snprintf"); + + if (symlink(path, rqpath) == -1) { + if (errno == EEXIST) + continue; + if (errno == ENOSPC) + break; + fatal("runner_process_queue: symlink"); + } + + if (message.type & T_MDA_MESSAGE) + mda_av--; + if (message.type & T_MTA_MESSAGE) + mta_av--; + if (message.type & T_BOUNCE_MESSAGE) + bnc_av--; + } + + qwalk_close(q); +} + +void +runner_process_runqueue(struct smtpd *env) +{ + char path[MAXPATHLEN]; + struct message message; + time_t tm; + struct batch *batchp; + struct message *messagep; + struct qwalk *q; + + tm = time(NULL); + + q = qwalk_new(PATH_RUNQUEUE); + + while (qwalk(q, path)) { + unlink(path); + + if (! queue_load_envelope(&message, basename(path))) + continue; + + if (message.flags & F_MESSAGE_PROCESSING) + continue; + + message.lasttry = tm; + message.flags &= ~F_MESSAGE_SCHEDULED; + message.flags |= F_MESSAGE_PROCESSING; + + if (! queue_update_envelope(&message)) + continue; + + messagep = calloc(1, sizeof (struct message)); + if (messagep == NULL) + fatal("runner_process_runqueue: calloc"); + *messagep = message; + + messagep->batch_id = 0; + batchp = batch_lookup(env, messagep); + if (batchp != NULL) + messagep->batch_id = batchp->id; + + batchp = batch_record(env, messagep); + if (messagep->batch_id == 0) + messagep->batch_id = batchp->id; + } + + qwalk_close(q); +} + +void +runner_process_batchqueue(struct smtpd *env) +{ + struct batch *batchp; + struct message *m; + int fd; + + while ((batchp = SPLAY_MIN(batchtree, &env->batch_queue)) != NULL) { + switch (batchp->type) { + case T_BOUNCE_BATCH: + while ((m = TAILQ_FIRST(&batchp->messages))) { + imsg_compose_event(env->sc_ievs[PROC_QUEUE], + IMSG_SMTP_ENQUEUE, PROC_SMTP, 0, -1, m, + sizeof *m); + TAILQ_REMOVE(&batchp->messages, m, entry); + free(m); + } + env->stats->runner.bounces_active++; + env->stats->runner.bounces++; + break; + + case T_MDA_BATCH: + m = TAILQ_FIRST(&batchp->messages); + fd = queue_open_message_file(m->message_id); + imsg_compose_event(env->sc_ievs[PROC_QUEUE], + IMSG_MDA_SESS_NEW, PROC_MDA, 0, fd, m, + sizeof *m); + TAILQ_REMOVE(&batchp->messages, m, entry); + free(m); + env->stats->mda.sessions_active++; + env->stats->mda.sessions++; + break; + + case T_MTA_BATCH: + imsg_compose_event(env->sc_ievs[PROC_QUEUE], + IMSG_BATCH_CREATE, PROC_MTA, 0, -1, batchp, + sizeof *batchp); + while ((m = TAILQ_FIRST(&batchp->messages))) { + imsg_compose_event(env->sc_ievs[PROC_QUEUE], + IMSG_BATCH_APPEND, PROC_MTA, 0, -1, m, + sizeof *m); + TAILQ_REMOVE(&batchp->messages, m, entry); + free(m); + } + imsg_compose_event(env->sc_ievs[PROC_QUEUE], + IMSG_BATCH_CLOSE, PROC_MTA, 0, -1, batchp, + sizeof *batchp); + env->stats->mta.sessions_active++; + env->stats->mta.sessions++; + break; + + default: + fatalx("runner_process_batchqueue: unknown type"); + } + + SPLAY_REMOVE(batchtree, &env->batch_queue, batchp); + free(batchp); + } +} + +int +runner_message_schedule(struct message *messagep, time_t tm) +{ + time_t delay; + + if (messagep->flags & (F_MESSAGE_SCHEDULED|F_MESSAGE_PROCESSING)) + return 0; + + if (messagep->flags & F_MESSAGE_FORCESCHEDULE) + return 1; + + /* Batch has been in the queue for too long and expired */ + if (tm - messagep->creation >= SMTPD_QUEUE_EXPIRY) { + message_set_errormsg(messagep, "message expired after sitting in queue for %d days", + SMTPD_QUEUE_EXPIRY / 60 / 60 / 24); + bounce_record_message(messagep); + queue_remove_envelope(messagep); + return 0; + } + + if (messagep->lasttry == 0) + return 1; + + delay = SMTPD_QUEUE_MAXINTERVAL; + + // recompute path + + if (messagep->type == T_MDA_MESSAGE || + messagep->type == T_BOUNCE_MESSAGE) { + if (messagep->retry < 5) + return 1; + + if (messagep->retry < 15) + delay = (messagep->retry * 60) + arc4random_uniform(60); + } + + if (messagep->type == T_MTA_MESSAGE) { + if (messagep->retry < 3) + delay = SMTPD_QUEUE_INTERVAL; + else if (messagep->retry <= 7) { + delay = SMTPD_QUEUE_INTERVAL * (1 << (messagep->retry - 3)); + if (delay > SMTPD_QUEUE_MAXINTERVAL) + delay = SMTPD_QUEUE_MAXINTERVAL; + } + } + + if (tm >= messagep->lasttry + delay) + return 1; + + return 0; +} + +int +runner_force_envelope_schedule(char *mid) +{ + struct message message; + + if (! queue_load_envelope(&message, mid)) + return 0; + + if (message.flags & (F_MESSAGE_PROCESSING|F_MESSAGE_SCHEDULED)) + return 1; + + message.flags |= F_MESSAGE_FORCESCHEDULE; + + if (! queue_update_envelope(&message)) + return 0; + + return 1; +} + +int +runner_force_message_schedule(char *mid) +{ + char path[MAXPATHLEN]; + DIR *dirp; + struct dirent *dp; + + if (! bsnprintf(path, MAXPATHLEN, "%s/%d/%s/envelopes", + PATH_QUEUE, queue_hash(mid), mid)) + return 0; + + dirp = opendir(path); + if (dirp == NULL) + return 0; + + while ((dp = readdir(dirp)) != NULL) { + if (valid_message_uid(dp->d_name)) + runner_force_envelope_schedule(dp->d_name); + } + closedir(dirp); + + return 1; +} + + +int +runner_force_envelope_remove(char *mid) +{ + struct message message; + + if (! queue_load_envelope(&message, mid)) + return 0; + + if (message.flags & (F_MESSAGE_PROCESSING|F_MESSAGE_SCHEDULED)) + return 0; + + if (! queue_remove_envelope(&message)) + return 0; + + return 1; +} + +int +runner_force_message_remove(char *mid) +{ + char path[MAXPATHLEN]; + DIR *dirp; + struct dirent *dp; + + if (! bsnprintf(path, MAXPATHLEN, "%s/%d/%s/envelopes", + PATH_QUEUE, queue_hash(mid), mid)) + return 0; + + dirp = opendir(path); + if (dirp == NULL) + return 0; + + while ((dp = readdir(dirp)) != NULL) { + if (valid_message_uid(dp->d_name)) + runner_force_envelope_remove(dp->d_name); + } + closedir(dirp); + + return 1; +} + +void +runner_purge_run(void) +{ + char path[MAXPATHLEN]; + struct qwalk *q; + + q = qwalk_new(PATH_PURGE); + + while (qwalk(q, path)) + runner_purge_message(basename(path)); + + qwalk_close(q); +} + +void +runner_purge_message(char *msgid) +{ + char rootdir[MAXPATHLEN]; + char evpdir[MAXPATHLEN]; + char evppath[MAXPATHLEN]; + char msgpath[MAXPATHLEN]; + DIR *dirp; + struct dirent *dp; + + if (! bsnprintf(rootdir, sizeof(rootdir), "%s/%s", PATH_PURGE, msgid)) + fatal("runner_purge_message: snprintf"); + + if (! bsnprintf(evpdir, sizeof(evpdir), "%s%s", rootdir, + PATH_ENVELOPES)) + fatal("runner_purge_message: snprintf"); + + if (! bsnprintf(msgpath, sizeof(msgpath), "%s/message", rootdir)) + fatal("runner_purge_message: snprintf"); + + if (unlink(msgpath) == -1) + if (errno != ENOENT) + fatal("runner_purge_message: unlink"); + + dirp = opendir(evpdir); + if (dirp == NULL) { + if (errno == ENOENT) + goto delroot; + fatal("runner_purge_message: opendir"); + } + while ((dp = readdir(dirp)) != NULL) { + if (strcmp(dp->d_name, ".") == 0 || + strcmp(dp->d_name, "..") == 0) + continue; + if (! bsnprintf(evppath, sizeof(evppath), "%s/%s", evpdir, + dp->d_name)) + fatal("runner_purge_message: snprintf"); + + if (unlink(evppath) == -1) + if (errno != ENOENT) + fatal("runner_purge_message: unlink"); + } + closedir(dirp); + + if (rmdir(evpdir) == -1) + if (errno != ENOENT) + fatal("runner_purge_message: rmdir"); + +delroot: + if (rmdir(rootdir) == -1) + if (errno != ENOENT) + fatal("runner_purge_message: rmdir"); +} + +struct batch * +batch_record(struct smtpd *env, struct message *messagep) +{ + struct batch *batchp; + struct path *path; + + batchp = NULL; + if (messagep->batch_id != 0) { + batchp = batch_by_id(env, messagep->batch_id); + if (batchp == NULL) + fatalx("batch_record: internal inconsistency."); + } + if (batchp == NULL) { + batchp = calloc(1, sizeof(struct batch)); + if (batchp == NULL) + fatal("batch_record: calloc"); + + batchp->id = generate_uid(); + + (void)strlcpy(batchp->message_id, messagep->message_id, + sizeof(batchp->message_id)); + TAILQ_INIT(&batchp->messages); + SPLAY_INSERT(batchtree, &env->batch_queue, batchp); + + if (messagep->type & T_BOUNCE_MESSAGE) { + batchp->type = T_BOUNCE_BATCH; + path = &messagep->sender; + } + else { + path = &messagep->recipient; + } + batchp->rule = path->rule; + + (void)strlcpy(batchp->hostname, path->domain, + sizeof(batchp->hostname)); + + if (batchp->type != T_BOUNCE_BATCH) { + if (IS_MAILBOX(*path) || IS_EXT(*path)) { + batchp->type = T_MDA_BATCH; + } + else { + batchp->type = T_MTA_BATCH; + } + } + } + + TAILQ_INSERT_TAIL(&batchp->messages, messagep, entry); + env->stats->runner.active++; + return batchp; +} + +struct batch * +batch_lookup(struct smtpd *env, struct message *message) +{ + struct batch *batchp; + struct batch lookup; + + /* We only support delivery of one message at a time, in MDA + * and bounces messages. + */ + if (message->type == T_BOUNCE_MESSAGE || message->type == T_MDA_MESSAGE) + return NULL; + + /* If message->batch_id != 0, we can retrieve batch by id */ + if (message->batch_id != 0) { + lookup.id = message->batch_id; + return SPLAY_FIND(batchtree, &env->batch_queue, &lookup); + } + + /* We do not know the batch_id yet, maybe it was created but we could not + * be notified, or it just does not exist. Let's scan to see if we can do + * a match based on our message_id and flags. + */ + SPLAY_FOREACH(batchp, batchtree, &env->batch_queue) { + + if (batchp->type != message->type) + continue; + + if (strcasecmp(batchp->message_id, message->message_id) != 0) + continue; + + if (batchp->type & T_MTA_BATCH) + if (strcasecmp(batchp->hostname, message->recipient.domain) != 0) + continue; + + break; + } + + return batchp; +} + +int +batch_cmp(struct batch *s1, struct batch *s2) +{ + /* + * do not return u_int64_t's + */ + if (s1->id < s2->id) + return (-1); + + if (s1->id > s2->id) + return (1); + + return (0); +} + +int +runner_check_loop(struct message *messagep) +{ + int fd; + FILE *fp; + char *buf, *lbuf; + size_t len; + struct path chkpath; + int ret = 0; + int rcvcount = 0; + + fd = queue_open_message_file(messagep->message_id); + if ((fp = fdopen(fd, "r")) == NULL) + fatal("fdopen"); + + lbuf = NULL; + while ((buf = fgetln(fp, &len))) { + if (buf[len - 1] == '\n') + buf[len - 1] = '\0'; + else { + /* EOF without EOL, copy and add the NUL */ + if ((lbuf = malloc(len + 1)) == NULL) + err(1, NULL); + memcpy(lbuf, buf, len); + lbuf[len] = '\0'; + buf = lbuf; + } + + if (strchr(buf, ':') == NULL && !isspace((int)*buf)) + break; + + if (strncasecmp("Received: ", buf, 10) == 0) { + rcvcount++; + if (rcvcount == MAX_HOPS_COUNT) { + ret = 1; + break; + } + } + + else if (strncasecmp("Delivered-To: ", buf, 14) == 0) { + struct path rcpt; + + bzero(&chkpath, sizeof (struct path)); + if (! recipient_to_path(&chkpath, buf + 14)) + continue; + + rcpt = messagep->recipient; + if (messagep->type == T_BOUNCE_MESSAGE) + rcpt = messagep->sender; + + if (strcasecmp(chkpath.user, rcpt.user) == 0 && + strcasecmp(chkpath.domain, rcpt.domain) == 0) { + ret = 1; + break; + } + } + } + free(lbuf); + + fclose(fp); + return ret; +} + +void +message_reset_flags(struct message *m) +{ + m->flags &= ~F_MESSAGE_SCHEDULED; + m->flags &= ~F_MESSAGE_PROCESSING; + + while (! queue_update_envelope(m)) + sleep(1); +} + +SPLAY_GENERATE(batchtree, batch, b_nodes, batch_cmp); |