From 16051086cdf431871bdba21474ce799c9a6207b2 Mon Sep 17 00:00:00 2001 From: Jacek Masiulaniec Date: Tue, 6 Jan 2009 23:02:08 +0000 Subject: rework /queue traversal; ok gilles@ --- usr.sbin/smtpd/runner.c | 211 +++++++++++++++++++++++++----------------------- 1 file changed, 110 insertions(+), 101 deletions(-) diff --git a/usr.sbin/smtpd/runner.c b/usr.sbin/smtpd/runner.c index 40d4efbd2fd..38a353a25a2 100644 --- a/usr.sbin/smtpd/runner.c +++ b/usr.sbin/smtpd/runner.c @@ -1,4 +1,4 @@ -/* $OpenBSD: runner.c,v 1.20 2009/01/06 20:17:23 jacekm Exp $ */ +/* $OpenBSD: runner.c,v 1.21 2009/01/06 23:02:07 jacekm Exp $ */ /* * Copyright (c) 2008 Gilles Chehade @@ -54,13 +54,13 @@ void runner_setup_events(struct smtpd *); void runner_disable_events(struct smtpd *); void runner_timeout(int, short, void *); -void runner_process_queue(struct smtpd *); -void runner_process_bucket(struct smtpd *, u_int16_t); -void runner_process_message(struct smtpd *, char *); -void runner_process_envelope(struct smtpd *, char *, char *); +void runner_process_envelope(struct message *); void runner_process_runqueue(struct smtpd *); void runner_process_batchqueue(struct smtpd *); +void runner_envelope_rewind(void); +int runner_envelope_next(struct message *); + int runner_batch_resolved(struct smtpd *, struct batch *); void runner_batch_dispatch(struct smtpd *, struct batch *, time_t); @@ -74,6 +74,8 @@ struct batch *batch_lookup(struct smtpd *, struct message *); u_int8_t flagreset = 1; +static DIR *curdir, *dir_queue, *dir_bucket, *dir_envelope; + void runner_sig_handler(int sig, short event, void *p) { @@ -432,10 +434,24 @@ void runner_timeout(int fd, short event, void *p) { struct smtpd *env = p; + struct message message; struct timeval tv; runner_purge_run(); - runner_process_queue(env); + runner_envelope_rewind(); + + while (runner_envelope_next(&message)) { + if (message.type & T_MDA_MESSAGE) + if (env->sc_opts & SMTPD_MDA_PAUSED) + continue; + + if (message.type & T_MTA_MESSAGE) + if (env->sc_opts & SMTPD_MTA_PAUSED) + continue; + + runner_process_envelope(&message); + } + flagreset = 0; runner_process_runqueue(env); runner_process_batchqueue(env); @@ -446,114 +462,30 @@ runner_timeout(int fd, short event, void *p) } void -runner_process_queue(struct smtpd *env) -{ - DIR *dirp; - struct dirent *dp; - const char *errstr; - u_int16_t bucket; - - dirp = opendir(PATH_QUEUE); - if (dirp == NULL) - fatal("runner_process_queue: opendir"); - while ((dp = readdir(dirp)) != NULL) { - if (strcmp(dp->d_name, ".") == 0 || - strcmp(dp->d_name, "..") == 0) - continue; - bucket = strtonum(dp->d_name, 0, DIRHASH_BUCKETS - 1, &errstr); - if (errstr) { - log_warnx("runner_process_queue: invalid bucket: %s/%s", - PATH_QUEUE, dp->d_name); - continue; - } - runner_process_bucket(env, bucket); - } - closedir(dirp); -} - -void -runner_process_bucket(struct smtpd *env, u_int16_t bucket) -{ - DIR *dirp = NULL; - struct dirent *dp; - char bucketpath[MAXPATHLEN]; - - if (! bsnprintf(bucketpath, MAXPATHLEN, "%s/%d", PATH_QUEUE, bucket)) - fatal("runner_process_bucket: snprintf"); - dirp = opendir(bucketpath); - if (dirp == NULL) - return; - while ((dp = readdir(dirp)) != NULL) { - if (strcmp(dp->d_name, ".") == 0 || - strcmp(dp->d_name, "..") == 0) - continue; - runner_process_message(env, dp->d_name); - } - closedir(dirp); -} - -void -runner_process_message(struct smtpd *env, char *messageid) -{ - DIR *dirp = NULL; - struct dirent *dp; - char evppath[MAXPATHLEN]; - u_int16_t hval; - - hval = queue_hash(messageid); - if (! bsnprintf(evppath, MAXPATHLEN, "%s/%d/%s%s", PATH_QUEUE, hval, - messageid, PATH_ENVELOPES)) - fatal("runner_process_message: snprintf"); - dirp = opendir(evppath); - if (dirp == NULL) - return; - while ((dp = readdir(dirp)) != NULL) { - if (strcmp(dp->d_name, ".") == 0 || - strcmp(dp->d_name, "..") == 0) - continue; - runner_process_envelope(env, dp->d_name, dp->d_name); - } - closedir(dirp); -} - -void -runner_process_envelope(struct smtpd *env, char *msgid, char *evpid) +runner_process_envelope(struct message *messagep) { - struct message message; - time_t tm; char evppath[MAXPATHLEN]; char rqpath[MAXPATHLEN]; - u_int16_t hval; struct stat sb; - if (! queue_load_envelope(&message, evpid)) - return; - - tm = time(NULL); - if (flagreset) { - message.flags &= ~(F_MESSAGE_SCHEDULED|F_MESSAGE_PROCESSING); - queue_update_envelope(&message); + messagep->flags &= ~(F_MESSAGE_SCHEDULED|F_MESSAGE_PROCESSING); + queue_update_envelope(messagep); } - if ((message.type & T_MDA_MESSAGE) && (env->sc_opts & SMTPD_MDA_PAUSED)) - return; - - if ((message.type & T_MTA_MESSAGE) && (env->sc_opts & SMTPD_MTA_PAUSED)) - return; - - if (! runner_message_schedule(&message, tm)) + if (! runner_message_schedule(messagep, time(NULL))) return; - message.flags |= F_MESSAGE_SCHEDULED; - queue_update_envelope(&message); + messagep->flags |= F_MESSAGE_SCHEDULED; + queue_update_envelope(messagep); - hval = queue_hash(msgid); - if (! bsnprintf(evppath, MAXPATHLEN, "%s/%d/%s%s/%s", PATH_QUEUE, hval, - msgid, PATH_ENVELOPES, evpid)) + if (! bsnprintf(evppath, MAXPATHLEN, "%s/%d/%s%s/%s", PATH_QUEUE, + queue_hash(messagep->message_id), messagep->message_id, + PATH_ENVELOPES, messagep->message_uid)) fatal("runner_process_envelope: snprintf"); - if (! bsnprintf(rqpath, MAXPATHLEN, "%s/%s", PATH_RUNQUEUE, evpid)) + if (! bsnprintf(rqpath, MAXPATHLEN, "%s/%s", PATH_RUNQUEUE, + messagep->message_uid)) fatal("runner_process_envelope: snprintf"); if (stat(rqpath, &sb) == -1) { @@ -645,6 +577,83 @@ runner_process_batchqueue(struct smtpd *env) } } +void +runner_envelope_rewind(void) +{ + if (dir_queue != NULL) { + closedir(dir_queue); + closedir(dir_bucket); + closedir(dir_envelope); + } + + curdir = dir_queue = opendir(PATH_QUEUE); + if (curdir == NULL) + fatal("runner_envelope_rewind: opendir"); +} + +int +runner_envelope_next(struct message *messagep) +{ + static int bucket; + char path[MAXPATHLEN]; + const char *errstr; + struct dirent *dp; + +again: + dp = readdir(curdir); + + if (dp == NULL) + closedir(curdir); + else if (strcmp(dp->d_name, ".") == 0 || strcmp(dp->d_name, "..") == 0) + goto again; + + if (curdir == dir_queue) { + if (dp == NULL) { + dir_queue = NULL; + return (0); + } + bucket = strtonum(dp->d_name, 0, DIRHASH_BUCKETS - 1, &errstr); + if (errstr) { + log_warnx("runner_envelope_next: invalid bucket: %s/%s", + PATH_QUEUE, dp->d_name); + goto again; + } + if (! bsnprintf(path, MAXPATHLEN, "%s/%d", PATH_QUEUE, bucket)) + fatalx("runner_envelope_next: snprintf"); + curdir = dir_bucket = opendir(path); + goto recurse; + } + + if (curdir == dir_bucket) { + if (dp == NULL) { + curdir = dir_queue; + goto again; + } + if (! bsnprintf(path, MAXPATHLEN, "%s/%d/%s%s", PATH_QUEUE, + bucket, dp->d_name, PATH_ENVELOPES)) + fatalx("runner_envelope_next: snprintf"); + curdir = dir_envelope = opendir(path); + goto recurse; + } + + if (curdir == dir_envelope) { + if (dp == NULL) { + curdir = dir_bucket; + goto again; + } + queue_load_envelope(messagep, dp->d_name); + return (1); + } + +recurse: + if (curdir == NULL) { + if (errno == ENOENT) + goto again; + fatal("runner_envelope_next: opendir failed"); + } + goto again; +} + int runner_batch_resolved(struct smtpd *env, struct batch *lookup) { -- cgit v1.2.3