diff options
author | Gilles Chehade <gilles@cvs.openbsd.org> | 2008-11-11 21:13:15 +0000 |
---|---|---|
committer | Gilles Chehade <gilles@cvs.openbsd.org> | 2008-11-11 21:13:15 +0000 |
commit | f1dd1c28bcd0f255b550709592cafc556407ae6b (patch) | |
tree | 6255c6228e52541ddf692be3fc5e9e91a31c1f13 /usr.sbin | |
parent | 501accc2edbef1db38b03277b505cc427054d18a (diff) |
- introduce queue_init_submissions() which will sanitize the disk-based
queue at startup: catches left overs from interrupted sessions,
reset F_MESSAGE_INPROCESS so that messages which were in MTA or
MDA gets scheduled again.
- temporarily comment chl@'s O_EXLOCK -> fcntl change until we figure
why it locks my mailbox under load
Diffstat (limited to 'usr.sbin')
-rw-r--r-- | usr.sbin/smtpd/queue.c | 78 |
1 files changed, 63 insertions, 15 deletions
diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c index 9d35638c45a..2705294caee 100644 --- a/usr.sbin/smtpd/queue.c +++ b/usr.sbin/smtpd/queue.c @@ -1,4 +1,4 @@ -/* $OpenBSD: queue.c,v 1.9 2008/11/11 01:08:08 gilles Exp $ */ +/* $OpenBSD: queue.c,v 1.10 2008/11/11 21:13:14 gilles Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> @@ -78,6 +78,7 @@ void queue_load_submissions(struct smtpd *, time_t); int queue_message_schedule(struct message *, time_t); int queue_message_from_id(char *, struct message *); int queue_message_complete(struct message *); +int queue_init_submissions(void); void queue_sig_handler(int sig, short event, void *p) @@ -572,10 +573,10 @@ queue_timeout(int fd, short event, void *p) batchp != NULL; batchp = nxt) { nxt = SPLAY_NEXT(batchtree, &env->batch_queue, batchp); - if ((batchp->type & T_MTA_BATCH) && - (batchp->flags & F_BATCH_RESOLVED) == 0) + (batchp->flags & F_BATCH_RESOLVED) == 0) { continue; + } batch_send(env, batchp, curtime); @@ -620,6 +621,7 @@ queue_load_submissions(struct smtpd *env, time_t tm) } continue; } + message.lasttry = tm; message.flags |= F_MESSAGE_PROCESSING; queue_update_database(&message); @@ -636,7 +638,6 @@ queue_load_submissions(struct smtpd *env, time_t tm) batchp = queue_record_batch(env, messagep); if (messagep->batch_id == 0) messagep->batch_id = batchp->id; - } closedir(dirp); @@ -724,6 +725,11 @@ queue(struct smtpd *env) fatal("queue: cannot drop privileges"); #endif + SPLAY_INIT(&env->batch_queue); + + queue_init_submissions(); + queue_load_submissions(env, time(NULL)); + event_init(); signal_set(&ev_sigint, SIGINT, queue_sig_handler, env); @@ -735,8 +741,6 @@ queue(struct smtpd *env) config_peers(env, peers, 5); - SPLAY_INIT(&env->batch_queue); - queue_setup_events(env); event_dispatch(); queue_shutdown(); @@ -809,7 +813,7 @@ queue_record_submission(struct message *message) char *spool; size_t spoolsz; int fd; - int mode = O_CREAT|O_TRUNC|O_WRONLY|O_EXCL|O_SYNC; + int mode = O_CREAT|O_TRUNC|O_WRONLY|O_EXCL|O_SYNC|O_EXCL; int spret; FILE *fp; int hm; @@ -859,8 +863,8 @@ queue_record_submission(struct message *message) if (unlink(linkname) == -1) fatal("queue_record_submission: unlink"); - if (flock(fd, LOCK_EX) == -1) - fatal("queue_record_submission: flock"); +// if (flock(fd, LOCK_EX) == -1) +// fatal("queue_record_submission: flock"); fp = fdopen(fd, "w"); if (fp == NULL) @@ -1136,9 +1140,6 @@ queue_update_database(struct message *message) if ((fd = open(pathname, O_RDWR)) == -1) fatal("queue_update_database: cannot open database"); - if (flock(fd, LOCK_EX) == -1) - fatal("queue_update_database: cannot get a lock on database"); - fp = fdopen(fd, "w"); if (fp == NULL) fatal("fdopen"); @@ -1163,7 +1164,7 @@ queue_record_daemon(struct message *message) char message_uid[MAXPATHLEN]; size_t spoolsz; int fd; - int mode = O_CREAT|O_TRUNC|O_WRONLY|O_EXCL|O_SYNC; + int mode = O_CREAT|O_TRUNC|O_WRONLY|O_EXCL|O_SYNC|O_EXLOCK; int spret; FILE *fp; @@ -1200,8 +1201,8 @@ queue_record_daemon(struct message *message) if (unlink(linkname) == -1) err(1, "unlink"); - if (flock(fd, LOCK_EX) == -1) - err(1, "flock"); +// if (flock(fd, LOCK_EX) == -1) +// err(1, "flock"); fp = fdopen(fd, "w"); if (fp == NULL) @@ -1277,6 +1278,53 @@ batch_cmp(struct batch *s1, struct batch *s2) } int +queue_init_submissions(void) +{ + DIR *dirp; + struct dirent *dp; + struct message message; + char pathname[MAXPATHLEN]; + FILE *fp; + int spret; + + dirp = opendir(PATH_ENVELOPES); + if (dirp == NULL) + err(1, "opendir"); + + while ((dp = readdir(dirp)) != NULL) { + + if (dp->d_name[0] == '.') + continue; + + spret = snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_ENVELOPES, + dp->d_name); + if (spret == -1 || spret >= MAXPATHLEN) + continue; + + fp = fopen(pathname, "r"); + if (fp == NULL) + continue; + + if (fread(&message, 1, sizeof(struct message), fp) != + sizeof(struct message)) { + fclose(fp); + continue; + } + fclose(fp); + + if ((message.flags & F_MESSAGE_COMPLETE) == 0) + unlink(pathname); + else { + message.flags &= ~F_MESSAGE_PROCESSING; + queue_update_database(&message); + } + } + + closedir(dirp); + return 1; +} + +int queue_message_complete(struct message *messagep) { DIR *dirp; |