summaryrefslogtreecommitdiff
path: root/usr.sbin/smtpd/runner.c
diff options
context:
space:
mode:
authorJacek Masiulaniec <jacekm@cvs.openbsd.org>2009-01-29 12:43:26 +0000
committerJacek Masiulaniec <jacekm@cvs.openbsd.org>2009-01-29 12:43:26 +0000
commitb9bfb27b66bb75007562fe135ffb4b1be9c4bd16 (patch)
tree3e55179a54e9e6ef1a8738c757431a88ef0ac674 /usr.sbin/smtpd/runner.c
parent396bc3cb8b875e7ec1ce1ff0020dd352ae7aecbe (diff)
Common queue walking code for smtpd and smtpctl. Kills majority of showqueue.c,
the remaining code was moved to queue_shared.c; ok gilles@
Diffstat (limited to 'usr.sbin/smtpd/runner.c')
-rw-r--r--usr.sbin/smtpd/runner.c239
1 files changed, 72 insertions, 167 deletions
diff --git a/usr.sbin/smtpd/runner.c b/usr.sbin/smtpd/runner.c
index e7dffffdc67..7e4ce43eae2 100644
--- a/usr.sbin/smtpd/runner.c
+++ b/usr.sbin/smtpd/runner.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: runner.c,v 1.26 2009/01/28 17:43:45 gilles Exp $ */
+/* $OpenBSD: runner.c,v 1.27 2009/01/29 12:43:25 jacekm Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -34,6 +34,7 @@
#include <errno.h>
#include <event.h>
#include <fcntl.h>
+#include <libgen.h>
#include <netdb.h>
#include <pwd.h>
#include <signal.h>
@@ -55,17 +56,14 @@ void runner_dispatch_lka(int, short, void *);
void runner_setup_events(struct smtpd *);
void runner_disable_events(struct smtpd *);
-int runner_reset_flags(void);
+void runner_reset_flags(void);
void runner_timeout(int, short, void *);
-void runner_process_envelope(struct message *);
+void runner_process_queue(struct smtpd *);
void runner_process_runqueue(struct smtpd *);
void runner_process_batchqueue(struct smtpd *);
-void runner_envelope_rewind(void);
-int runner_envelope_next(struct message *);
-
int runner_batch_resolved(struct smtpd *, struct batch *);
void runner_batch_dispatch(struct smtpd *, struct batch *, time_t);
@@ -79,8 +77,6 @@ int runner_check_loop(struct message *);
struct batch *batch_record(struct smtpd *, struct message *);
struct batch *batch_lookup(struct smtpd *, struct message *);
-static DIR *curdir, *dir_queue, *dir_bucket, *dir_envelope;
-
void
runner_sig_handler(int sig, short event, void *p)
{
@@ -431,8 +427,7 @@ runner(struct smtpd *env)
config_peers(env, peers, 5);
- while (! runner_reset_flags())
- sleep(1);
+ runner_reset_flags();
runner_setup_events(env);
event_dispatch();
@@ -441,46 +436,38 @@ runner(struct smtpd *env)
return (0);
}
-int
+void
runner_reset_flags(void)
{
- struct message message;
+ char path[MAXPATHLEN];
+ struct message message;
+ struct qwalk *q;
+
+ q = qwalk_new(PATH_QUEUE);
- runner_envelope_rewind();
+ while (qwalk(q, path)) {
+ while (! queue_load_envelope(&message, basename(path)))
+ sleep(1);
- while (runner_envelope_next(&message)) {
message.flags &= ~F_MESSAGE_SCHEDULED;
message.flags &= ~F_MESSAGE_PROCESSING;
- if (! queue_update_envelope(&message))
- return 0;
+ while (! queue_update_envelope(&message))
+ sleep(1);
}
- return 1;
+ qwalk_close(q);
}
void
runner_timeout(int fd, short event, void *p)
{
struct smtpd *env = p;
- struct message message;
struct timeval tv;
runner_purge_run();
- runner_envelope_rewind();
-
- while (runner_envelope_next(&message)) {
- if (message.type & T_MDA_MESSAGE)
- if (env->sc_opts & SMTPD_MDA_PAUSED)
- continue;
-
- if (message.type & T_MTA_MESSAGE)
- if (env->sc_opts & SMTPD_MTA_PAUSED)
- continue;
-
- runner_process_envelope(&message);
- }
+ runner_process_queue(env);
runner_process_runqueue(env);
runner_process_batchqueue(env);
@@ -490,68 +477,70 @@ runner_timeout(int fd, short event, void *p)
}
void
-runner_process_envelope(struct message *messagep)
+runner_process_queue(struct smtpd *env)
{
- char evppath[MAXPATHLEN];
- char rqpath[MAXPATHLEN];
- struct stat sb;
+ char path[MAXPATHLEN];
+ char rqpath[MAXPATHLEN];
+ struct message message;
+ time_t now;
+ struct qwalk *q;
- if (! runner_message_schedule(messagep, time(NULL)))
- return;
+ q = qwalk_new(PATH_QUEUE);
- runner_check_loop(messagep);
+ while (qwalk(q, path)) {
+ if (! queue_load_envelope(&message, basename(path)))
+ continue;
- messagep->flags |= F_MESSAGE_SCHEDULED;
- if (! queue_update_envelope(messagep))
- return;
+ if (message.type & T_MDA_MESSAGE)
+ if (env->sc_opts & SMTPD_MDA_PAUSED)
+ continue;
- if (! bsnprintf(evppath, MAXPATHLEN, "%s/%d/%s%s/%s", PATH_QUEUE,
- queue_hash(messagep->message_id), messagep->message_id,
- PATH_ENVELOPES, messagep->message_uid))
- fatal("runner_process_envelope: snprintf");
+ if (message.type & T_MTA_MESSAGE)
+ if (env->sc_opts & SMTPD_MTA_PAUSED)
+ continue;
- if (! bsnprintf(rqpath, MAXPATHLEN, "%s/%s", PATH_RUNQUEUE,
- messagep->message_uid))
- fatal("runner_process_envelope: snprintf");
+ if (! runner_message_schedule(&message, now))
+ continue;
- if (stat(rqpath, &sb) == -1) {
- if (errno != ENOENT)
- fatal("runner_process_envelope: stat");
+ runner_check_loop(&message);
- if (symlink(evppath, rqpath) == -1) {
- log_info("queue_process_envelope: "
- "failed to place envelope in runqueue");
+ message.flags |= F_MESSAGE_SCHEDULED;
+ queue_update_envelope(&message);
+
+ if (! bsnprintf(rqpath, sizeof(rqpath), "%s/%s", PATH_RUNQUEUE,
+ basename(path)))
+ fatalx("runner_process_queue: snprintf");
+
+ if (symlink(path, rqpath) == -1) {
+ if (errno == EEXIST)
+ continue;
+ if (errno == ENOSPC)
+ break;
+ fatal("runner_process_queue: symlink");
}
}
+
+ qwalk_close(q);
}
void
runner_process_runqueue(struct smtpd *env)
{
- DIR *dirp;
- struct dirent *dp;
- struct message message;
- struct message *messagep;
- struct batch *batchp;
- char pathname[MAXPATHLEN];
- time_t tm;
+ char path[MAXPATHLEN];
+ struct message message;
+ time_t tm;
+ struct batch *batchp;
+ struct message *messagep;
+ struct qwalk *q;
tm = time(NULL);
- dirp = opendir(PATH_RUNQUEUE);
- if (dirp == NULL)
- fatal("runner_process_runqueue: opendir");
+ q = qwalk_new(PATH_RUNQUEUE);
- while ((dp = readdir(dirp)) != NULL) {
- if (strcmp(dp->d_name, ".") == 0 ||
- strcmp(dp->d_name, "..") == 0)
- continue;
-
- snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_RUNQUEUE,
- dp->d_name);
- unlink(pathname);
+ while (qwalk(q, path)) {
+ unlink(path);
- if (! queue_load_envelope(&message, dp->d_name))
+ if (! queue_load_envelope(&message, basename(path)))
continue;
if (message.flags & F_MESSAGE_PROCESSING)
@@ -578,7 +567,7 @@ runner_process_runqueue(struct smtpd *env)
messagep->batch_id = batchp->id;
}
- closedir(dirp);
+ qwalk_close(q);
}
void
@@ -605,80 +594,6 @@ runner_process_batchqueue(struct smtpd *env)
}
}
-void
-runner_envelope_rewind(void)
-{
- if (dir_queue != NULL) {
- closedir(dir_queue);
- closedir(dir_bucket);
- closedir(dir_envelope);
- }
-
- curdir = dir_queue = opendir(PATH_QUEUE);
- if (curdir == NULL)
- fatal("runner_envelope_rewind: opendir");
-}
-
-int
-runner_envelope_next(struct message *messagep)
-{
- static int bucket;
- char path[MAXPATHLEN];
- const char *errstr;
- struct dirent *dp;
-
-again:
- dp = readdir(curdir);
-
- if (dp == NULL)
- closedir(curdir);
- else if (strcmp(dp->d_name, ".") == 0 || strcmp(dp->d_name, "..") == 0)
- goto again;
-
- if (curdir == dir_queue) {
- if (dp == NULL) {
- dir_queue = NULL;
- return (0);
- }
- bucket = strtonum(dp->d_name, 0, DIRHASH_BUCKETS - 1, &errstr);
- if (errstr) {
- log_warnx("runner_envelope_next: invalid bucket: %s/%s",
- PATH_QUEUE, dp->d_name);
- goto again;
- }
- if (! bsnprintf(path, MAXPATHLEN, "%s/%d", PATH_QUEUE, bucket))
- fatalx("runner_envelope_next: snprintf");
- curdir = dir_bucket = opendir(path);
- goto recurse;
- }
-
- if (curdir == dir_bucket) {
- if (dp == NULL) {
- curdir = dir_queue;
- goto again;
- }
- if (! bsnprintf(path, MAXPATHLEN, "%s/%d/%s%s", PATH_QUEUE,
- bucket, dp->d_name, PATH_ENVELOPES))
- fatalx("runner_envelope_next: snprintf");
- curdir = dir_envelope = opendir(path);
- goto recurse;
- }
-
- if (curdir == dir_envelope) {
- if (dp == NULL) {
- curdir = dir_bucket;
- goto again;
- }
- queue_load_envelope(messagep, dp->d_name);
- return (1);
- }
-
-recurse:
- if (curdir == NULL)
- fatal("runner_envelope_next: opendir failed");
- goto again;
-}
-
int
runner_batch_resolved(struct smtpd *env, struct batch *lookup)
{
@@ -826,31 +741,21 @@ runner_message_schedule(struct message *messagep, time_t tm)
void
runner_purge_run(void)
{
- DIR *dirp;
- struct dirent *dp;
-
- dirp = opendir(PATH_PURGE);
- if (dirp == NULL)
- fatal("runner_purge_run: opendir");
+ char path[MAXPATHLEN];
+ struct qwalk *q;
- while ((dp = readdir(dirp)) != NULL) {
- if (strcmp(dp->d_name, ".") == 0 ||
- strcmp(dp->d_name, "..") == 0) {
- continue;
- }
- if (strcmp(dp->d_name, "envelope.tmp") == 0) {
- char path[MAXPATHLEN];
+ q = qwalk_new(PATH_PURGE);
- if (! bsnprintf(path, MAXPATHLEN, "%s/envelope.tmp",
- PATH_PURGE))
- fatalx("runner_purge_run: snprintf");
+ while (qwalk(q, path)) {
+ if (strcmp(basename(path), "envelope.tmp") == 0) {
if (unlink(path) == -1)
fatal("runner_purge_run: unlink");
- } else
- runner_purge_message(dp->d_name);
+ continue;
+ }
+ runner_purge_message(basename(path));
}
- closedir(dirp);
+ qwalk_close(q);
}
void