diff options
author | Jacek Masiulaniec <jacekm@cvs.openbsd.org> | 2009-01-29 12:43:26 +0000 |
---|---|---|
committer | Jacek Masiulaniec <jacekm@cvs.openbsd.org> | 2009-01-29 12:43:26 +0000 |
commit | b9bfb27b66bb75007562fe135ffb4b1be9c4bd16 (patch) | |
tree | 3e55179a54e9e6ef1a8738c757431a88ef0ac674 /usr.sbin/smtpd/runner.c | |
parent | 396bc3cb8b875e7ec1ce1ff0020dd352ae7aecbe (diff) |
Common queue walking code for smtpd and smtpctl. Kills majority of showqueue.c,
the remaining code was moved to queue_shared.c; ok gilles@
Diffstat (limited to 'usr.sbin/smtpd/runner.c')
-rw-r--r-- | usr.sbin/smtpd/runner.c | 239 |
1 files changed, 72 insertions, 167 deletions
diff --git a/usr.sbin/smtpd/runner.c b/usr.sbin/smtpd/runner.c index e7dffffdc67..7e4ce43eae2 100644 --- a/usr.sbin/smtpd/runner.c +++ b/usr.sbin/smtpd/runner.c @@ -1,4 +1,4 @@ -/* $OpenBSD: runner.c,v 1.26 2009/01/28 17:43:45 gilles Exp $ */ +/* $OpenBSD: runner.c,v 1.27 2009/01/29 12:43:25 jacekm Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> @@ -34,6 +34,7 @@ #include <errno.h> #include <event.h> #include <fcntl.h> +#include <libgen.h> #include <netdb.h> #include <pwd.h> #include <signal.h> @@ -55,17 +56,14 @@ void runner_dispatch_lka(int, short, void *); void runner_setup_events(struct smtpd *); void runner_disable_events(struct smtpd *); -int runner_reset_flags(void); +void runner_reset_flags(void); void runner_timeout(int, short, void *); -void runner_process_envelope(struct message *); +void runner_process_queue(struct smtpd *); void runner_process_runqueue(struct smtpd *); void runner_process_batchqueue(struct smtpd *); -void runner_envelope_rewind(void); -int runner_envelope_next(struct message *); - int runner_batch_resolved(struct smtpd *, struct batch *); void runner_batch_dispatch(struct smtpd *, struct batch *, time_t); @@ -79,8 +77,6 @@ int runner_check_loop(struct message *); struct batch *batch_record(struct smtpd *, struct message *); struct batch *batch_lookup(struct smtpd *, struct message *); -static DIR *curdir, *dir_queue, *dir_bucket, *dir_envelope; - void runner_sig_handler(int sig, short event, void *p) { @@ -431,8 +427,7 @@ runner(struct smtpd *env) config_peers(env, peers, 5); - while (! runner_reset_flags()) - sleep(1); + runner_reset_flags(); runner_setup_events(env); event_dispatch(); @@ -441,46 +436,38 @@ runner(struct smtpd *env) return (0); } -int +void runner_reset_flags(void) { - struct message message; + char path[MAXPATHLEN]; + struct message message; + struct qwalk *q; + + q = qwalk_new(PATH_QUEUE); - runner_envelope_rewind(); + while (qwalk(q, path)) { + while (! queue_load_envelope(&message, basename(path))) + sleep(1); - while (runner_envelope_next(&message)) { message.flags &= ~F_MESSAGE_SCHEDULED; message.flags &= ~F_MESSAGE_PROCESSING; - if (! queue_update_envelope(&message)) - return 0; + while (! queue_update_envelope(&message)) + sleep(1); } - return 1; + qwalk_close(q); } void runner_timeout(int fd, short event, void *p) { struct smtpd *env = p; - struct message message; struct timeval tv; runner_purge_run(); - runner_envelope_rewind(); - - while (runner_envelope_next(&message)) { - if (message.type & T_MDA_MESSAGE) - if (env->sc_opts & SMTPD_MDA_PAUSED) - continue; - - if (message.type & T_MTA_MESSAGE) - if (env->sc_opts & SMTPD_MTA_PAUSED) - continue; - - runner_process_envelope(&message); - } + runner_process_queue(env); runner_process_runqueue(env); runner_process_batchqueue(env); @@ -490,68 +477,70 @@ runner_timeout(int fd, short event, void *p) } void -runner_process_envelope(struct message *messagep) +runner_process_queue(struct smtpd *env) { - char evppath[MAXPATHLEN]; - char rqpath[MAXPATHLEN]; - struct stat sb; + char path[MAXPATHLEN]; + char rqpath[MAXPATHLEN]; + struct message message; + time_t now; + struct qwalk *q; - if (! runner_message_schedule(messagep, time(NULL))) - return; + q = qwalk_new(PATH_QUEUE); - runner_check_loop(messagep); + while (qwalk(q, path)) { + if (! queue_load_envelope(&message, basename(path))) + continue; - messagep->flags |= F_MESSAGE_SCHEDULED; - if (! queue_update_envelope(messagep)) - return; + if (message.type & T_MDA_MESSAGE) + if (env->sc_opts & SMTPD_MDA_PAUSED) + continue; - if (! bsnprintf(evppath, MAXPATHLEN, "%s/%d/%s%s/%s", PATH_QUEUE, - queue_hash(messagep->message_id), messagep->message_id, - PATH_ENVELOPES, messagep->message_uid)) - fatal("runner_process_envelope: snprintf"); + if (message.type & T_MTA_MESSAGE) + if (env->sc_opts & SMTPD_MTA_PAUSED) + continue; - if (! bsnprintf(rqpath, MAXPATHLEN, "%s/%s", PATH_RUNQUEUE, - messagep->message_uid)) - fatal("runner_process_envelope: snprintf"); + if (! runner_message_schedule(&message, now)) + continue; - if (stat(rqpath, &sb) == -1) { - if (errno != ENOENT) - fatal("runner_process_envelope: stat"); + runner_check_loop(&message); - if (symlink(evppath, rqpath) == -1) { - log_info("queue_process_envelope: " - "failed to place envelope in runqueue"); + message.flags |= F_MESSAGE_SCHEDULED; + queue_update_envelope(&message); + + if (! bsnprintf(rqpath, sizeof(rqpath), "%s/%s", PATH_RUNQUEUE, + basename(path))) + fatalx("runner_process_queue: snprintf"); + + if (symlink(path, rqpath) == -1) { + if (errno == EEXIST) + continue; + if (errno == ENOSPC) + break; + fatal("runner_process_queue: symlink"); } } + + qwalk_close(q); } void runner_process_runqueue(struct smtpd *env) { - DIR *dirp; - struct dirent *dp; - struct message message; - struct message *messagep; - struct batch *batchp; - char pathname[MAXPATHLEN]; - time_t tm; + char path[MAXPATHLEN]; + struct message message; + time_t tm; + struct batch *batchp; + struct message *messagep; + struct qwalk *q; tm = time(NULL); - dirp = opendir(PATH_RUNQUEUE); - if (dirp == NULL) - fatal("runner_process_runqueue: opendir"); + q = qwalk_new(PATH_RUNQUEUE); - while ((dp = readdir(dirp)) != NULL) { - if (strcmp(dp->d_name, ".") == 0 || - strcmp(dp->d_name, "..") == 0) - continue; - - snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_RUNQUEUE, - dp->d_name); - unlink(pathname); + while (qwalk(q, path)) { + unlink(path); - if (! queue_load_envelope(&message, dp->d_name)) + if (! queue_load_envelope(&message, basename(path))) continue; if (message.flags & F_MESSAGE_PROCESSING) @@ -578,7 +567,7 @@ runner_process_runqueue(struct smtpd *env) messagep->batch_id = batchp->id; } - closedir(dirp); + qwalk_close(q); } void @@ -605,80 +594,6 @@ runner_process_batchqueue(struct smtpd *env) } } -void -runner_envelope_rewind(void) -{ - if (dir_queue != NULL) { - closedir(dir_queue); - closedir(dir_bucket); - closedir(dir_envelope); - } - - curdir = dir_queue = opendir(PATH_QUEUE); - if (curdir == NULL) - fatal("runner_envelope_rewind: opendir"); -} - -int -runner_envelope_next(struct message *messagep) -{ - static int bucket; - char path[MAXPATHLEN]; - const char *errstr; - struct dirent *dp; - -again: - dp = readdir(curdir); - - if (dp == NULL) - closedir(curdir); - else if (strcmp(dp->d_name, ".") == 0 || strcmp(dp->d_name, "..") == 0) - goto again; - - if (curdir == dir_queue) { - if (dp == NULL) { - dir_queue = NULL; - return (0); - } - bucket = strtonum(dp->d_name, 0, DIRHASH_BUCKETS - 1, &errstr); - if (errstr) { - log_warnx("runner_envelope_next: invalid bucket: %s/%s", - PATH_QUEUE, dp->d_name); - goto again; - } - if (! bsnprintf(path, MAXPATHLEN, "%s/%d", PATH_QUEUE, bucket)) - fatalx("runner_envelope_next: snprintf"); - curdir = dir_bucket = opendir(path); - goto recurse; - } - - if (curdir == dir_bucket) { - if (dp == NULL) { - curdir = dir_queue; - goto again; - } - if (! bsnprintf(path, MAXPATHLEN, "%s/%d/%s%s", PATH_QUEUE, - bucket, dp->d_name, PATH_ENVELOPES)) - fatalx("runner_envelope_next: snprintf"); - curdir = dir_envelope = opendir(path); - goto recurse; - } - - if (curdir == dir_envelope) { - if (dp == NULL) { - curdir = dir_bucket; - goto again; - } - queue_load_envelope(messagep, dp->d_name); - return (1); - } - -recurse: - if (curdir == NULL) - fatal("runner_envelope_next: opendir failed"); - goto again; -} - int runner_batch_resolved(struct smtpd *env, struct batch *lookup) { @@ -826,31 +741,21 @@ runner_message_schedule(struct message *messagep, time_t tm) void runner_purge_run(void) { - DIR *dirp; - struct dirent *dp; - - dirp = opendir(PATH_PURGE); - if (dirp == NULL) - fatal("runner_purge_run: opendir"); + char path[MAXPATHLEN]; + struct qwalk *q; - while ((dp = readdir(dirp)) != NULL) { - if (strcmp(dp->d_name, ".") == 0 || - strcmp(dp->d_name, "..") == 0) { - continue; - } - if (strcmp(dp->d_name, "envelope.tmp") == 0) { - char path[MAXPATHLEN]; + q = qwalk_new(PATH_PURGE); - if (! bsnprintf(path, MAXPATHLEN, "%s/envelope.tmp", - PATH_PURGE)) - fatalx("runner_purge_run: snprintf"); + while (qwalk(q, path)) { + if (strcmp(basename(path), "envelope.tmp") == 0) { if (unlink(path) == -1) fatal("runner_purge_run: unlink"); - } else - runner_purge_message(dp->d_name); + continue; + } + runner_purge_message(basename(path)); } - closedir(dirp); + qwalk_close(q); } void |