summaryrefslogtreecommitdiff
path: root/usr.sbin/smtpd/runner.c
diff options
context:
space:
mode:
authorJacek Masiulaniec <jacekm@cvs.openbsd.org>2009-01-06 23:02:08 +0000
committerJacek Masiulaniec <jacekm@cvs.openbsd.org>2009-01-06 23:02:08 +0000
commit16051086cdf431871bdba21474ce799c9a6207b2 (patch)
treefb18a48a08811fb9eb912f70d91e5faba3571da7 /usr.sbin/smtpd/runner.c
parent38de3313718facbf1f88f8ea1cedf3d1cf26d9fe (diff)
rework /queue traversal; ok gilles@
Diffstat (limited to 'usr.sbin/smtpd/runner.c')
-rw-r--r--usr.sbin/smtpd/runner.c211
1 files changed, 110 insertions, 101 deletions
diff --git a/usr.sbin/smtpd/runner.c b/usr.sbin/smtpd/runner.c
index 40d4efbd2fd..38a353a25a2 100644
--- a/usr.sbin/smtpd/runner.c
+++ b/usr.sbin/smtpd/runner.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: runner.c,v 1.20 2009/01/06 20:17:23 jacekm Exp $ */
+/* $OpenBSD: runner.c,v 1.21 2009/01/06 23:02:07 jacekm Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -54,13 +54,13 @@ void runner_setup_events(struct smtpd *);
void runner_disable_events(struct smtpd *);
void runner_timeout(int, short, void *);
-void runner_process_queue(struct smtpd *);
-void runner_process_bucket(struct smtpd *, u_int16_t);
-void runner_process_message(struct smtpd *, char *);
-void runner_process_envelope(struct smtpd *, char *, char *);
+void runner_process_envelope(struct message *);
void runner_process_runqueue(struct smtpd *);
void runner_process_batchqueue(struct smtpd *);
+void runner_envelope_rewind(void);
+int runner_envelope_next(struct message *);
+
int runner_batch_resolved(struct smtpd *, struct batch *);
void runner_batch_dispatch(struct smtpd *, struct batch *, time_t);
@@ -74,6 +74,8 @@ struct batch *batch_lookup(struct smtpd *, struct message *);
u_int8_t flagreset = 1;
+static DIR *curdir, *dir_queue, *dir_bucket, *dir_envelope;
+
void
runner_sig_handler(int sig, short event, void *p)
{
@@ -432,10 +434,24 @@ void
runner_timeout(int fd, short event, void *p)
{
struct smtpd *env = p;
+ struct message message;
struct timeval tv;
runner_purge_run();
- runner_process_queue(env);
+ runner_envelope_rewind();
+
+ while (runner_envelope_next(&message)) {
+ if (message.type & T_MDA_MESSAGE)
+ if (env->sc_opts & SMTPD_MDA_PAUSED)
+ continue;
+
+ if (message.type & T_MTA_MESSAGE)
+ if (env->sc_opts & SMTPD_MTA_PAUSED)
+ continue;
+
+ runner_process_envelope(&message);
+ }
+
flagreset = 0;
runner_process_runqueue(env);
runner_process_batchqueue(env);
@@ -446,114 +462,30 @@ runner_timeout(int fd, short event, void *p)
}
void
-runner_process_queue(struct smtpd *env)
-{
- DIR *dirp;
- struct dirent *dp;
- const char *errstr;
- u_int16_t bucket;
-
- dirp = opendir(PATH_QUEUE);
- if (dirp == NULL)
- fatal("runner_process_queue: opendir");
- while ((dp = readdir(dirp)) != NULL) {
- if (strcmp(dp->d_name, ".") == 0 ||
- strcmp(dp->d_name, "..") == 0)
- continue;
- bucket = strtonum(dp->d_name, 0, DIRHASH_BUCKETS - 1, &errstr);
- if (errstr) {
- log_warnx("runner_process_queue: invalid bucket: %s/%s",
- PATH_QUEUE, dp->d_name);
- continue;
- }
- runner_process_bucket(env, bucket);
- }
- closedir(dirp);
-}
-
-void
-runner_process_bucket(struct smtpd *env, u_int16_t bucket)
-{
- DIR *dirp = NULL;
- struct dirent *dp;
- char bucketpath[MAXPATHLEN];
-
- if (! bsnprintf(bucketpath, MAXPATHLEN, "%s/%d", PATH_QUEUE, bucket))
- fatal("runner_process_bucket: snprintf");
- dirp = opendir(bucketpath);
- if (dirp == NULL)
- return;
- while ((dp = readdir(dirp)) != NULL) {
- if (strcmp(dp->d_name, ".") == 0 ||
- strcmp(dp->d_name, "..") == 0)
- continue;
- runner_process_message(env, dp->d_name);
- }
- closedir(dirp);
-}
-
-void
-runner_process_message(struct smtpd *env, char *messageid)
-{
- DIR *dirp = NULL;
- struct dirent *dp;
- char evppath[MAXPATHLEN];
- u_int16_t hval;
-
- hval = queue_hash(messageid);
- if (! bsnprintf(evppath, MAXPATHLEN, "%s/%d/%s%s", PATH_QUEUE, hval,
- messageid, PATH_ENVELOPES))
- fatal("runner_process_message: snprintf");
- dirp = opendir(evppath);
- if (dirp == NULL)
- return;
- while ((dp = readdir(dirp)) != NULL) {
- if (strcmp(dp->d_name, ".") == 0 ||
- strcmp(dp->d_name, "..") == 0)
- continue;
- runner_process_envelope(env, dp->d_name, dp->d_name);
- }
- closedir(dirp);
-}
-
-void
-runner_process_envelope(struct smtpd *env, char *msgid, char *evpid)
+runner_process_envelope(struct message *messagep)
{
- struct message message;
- time_t tm;
char evppath[MAXPATHLEN];
char rqpath[MAXPATHLEN];
- u_int16_t hval;
struct stat sb;
- if (! queue_load_envelope(&message, evpid))
- return;
-
- tm = time(NULL);
-
if (flagreset) {
- message.flags &= ~(F_MESSAGE_SCHEDULED|F_MESSAGE_PROCESSING);
- queue_update_envelope(&message);
+ messagep->flags &= ~(F_MESSAGE_SCHEDULED|F_MESSAGE_PROCESSING);
+ queue_update_envelope(messagep);
}
- if ((message.type & T_MDA_MESSAGE) && (env->sc_opts & SMTPD_MDA_PAUSED))
- return;
-
- if ((message.type & T_MTA_MESSAGE) && (env->sc_opts & SMTPD_MTA_PAUSED))
- return;
-
- if (! runner_message_schedule(&message, tm))
+ if (! runner_message_schedule(messagep, time(NULL)))
return;
- message.flags |= F_MESSAGE_SCHEDULED;
- queue_update_envelope(&message);
+ messagep->flags |= F_MESSAGE_SCHEDULED;
+ queue_update_envelope(messagep);
- hval = queue_hash(msgid);
- if (! bsnprintf(evppath, MAXPATHLEN, "%s/%d/%s%s/%s", PATH_QUEUE, hval,
- msgid, PATH_ENVELOPES, evpid))
+ if (! bsnprintf(evppath, MAXPATHLEN, "%s/%d/%s%s/%s", PATH_QUEUE,
+ queue_hash(messagep->message_id), messagep->message_id,
+ PATH_ENVELOPES, messagep->message_uid))
fatal("runner_process_envelope: snprintf");
- if (! bsnprintf(rqpath, MAXPATHLEN, "%s/%s", PATH_RUNQUEUE, evpid))
+ if (! bsnprintf(rqpath, MAXPATHLEN, "%s/%s", PATH_RUNQUEUE,
+ messagep->message_uid))
fatal("runner_process_envelope: snprintf");
if (stat(rqpath, &sb) == -1) {
@@ -645,6 +577,83 @@ runner_process_batchqueue(struct smtpd *env)
}
}
+void
+runner_envelope_rewind(void)
+{
+ if (dir_queue != NULL) {
+ closedir(dir_queue);
+ closedir(dir_bucket);
+ closedir(dir_envelope);
+ }
+
+ curdir = dir_queue = opendir(PATH_QUEUE);
+ if (curdir == NULL)
+ fatal("runner_envelope_rewind: opendir");
+}
+
+int
+runner_envelope_next(struct message *messagep)
+{
+ static int bucket;
+ char path[MAXPATHLEN];
+ const char *errstr;
+ struct dirent *dp;
+
+again:
+ dp = readdir(curdir);
+
+ if (dp == NULL)
+ closedir(curdir);
+ else if (strcmp(dp->d_name, ".") == 0 || strcmp(dp->d_name, "..") == 0)
+ goto again;
+
+ if (curdir == dir_queue) {
+ if (dp == NULL) {
+ dir_queue = NULL;
+ return (0);
+ }
+ bucket = strtonum(dp->d_name, 0, DIRHASH_BUCKETS - 1, &errstr);
+ if (errstr) {
+ log_warnx("runner_envelope_next: invalid bucket: %s/%s",
+ PATH_QUEUE, dp->d_name);
+ goto again;
+ }
+ if (! bsnprintf(path, MAXPATHLEN, "%s/%d", PATH_QUEUE, bucket))
+ fatalx("runner_envelope_next: snprintf");
+ curdir = dir_bucket = opendir(path);
+ goto recurse;
+ }
+
+ if (curdir == dir_bucket) {
+ if (dp == NULL) {
+ curdir = dir_queue;
+ goto again;
+ }
+ if (! bsnprintf(path, MAXPATHLEN, "%s/%d/%s%s", PATH_QUEUE,
+ bucket, dp->d_name, PATH_ENVELOPES))
+ fatalx("runner_envelope_next: snprintf");
+ curdir = dir_envelope = opendir(path);
+ goto recurse;
+ }
+
+ if (curdir == dir_envelope) {
+ if (dp == NULL) {
+ curdir = dir_bucket;
+ goto again;
+ }
+ queue_load_envelope(messagep, dp->d_name);
+ return (1);
+ }
+
+recurse:
+ if (curdir == NULL) {
+ if (errno == ENOENT)
+ goto again;
+ fatal("runner_envelope_next: opendir failed");
+ }
+ goto again;
+}
+
int
runner_batch_resolved(struct smtpd *env, struct batch *lookup)
{