diff options
author | Gilles Chehade <gilles@cvs.openbsd.org> | 2008-12-03 17:58:01 +0000 |
---|---|---|
committer | Gilles Chehade <gilles@cvs.openbsd.org> | 2008-12-03 17:58:01 +0000 |
commit | adad4802d329834a5277b8ae5f11b1b89c3e20f6 (patch) | |
tree | f4200b2f16292f177afebcf74f9e69d829e2c6db | |
parent | 409ada53e614d1a9e6c07f9faf58c630baca98dc (diff) |
- fix event masking issues in smtp process which could lead to a fatal() if
queue process did not answer fast enough to an imsg. spotted by
Jacek Masiulaniec <jacekm@dobremiasto.net>
- queue layout was mostly to bootstrap the project, it does not behave good
under load, it does complex things to stay in a recoverable state
and it probably didnt do it too well. New queue code is simpler,
smaller and allows for atomic submissions (a mail can never be in a
state where it needs to be recovered). It still needs some work but
works better than previous code, no regression.
-rw-r--r-- | usr.sbin/smtpd/queue.c | 1303 | ||||
-rw-r--r-- | usr.sbin/smtpd/smtp.c | 24 | ||||
-rw-r--r-- | usr.sbin/smtpd/smtp_session.c | 95 | ||||
-rw-r--r-- | usr.sbin/smtpd/smtpd.c | 17 | ||||
-rw-r--r-- | usr.sbin/smtpd/smtpd.h | 34 |
5 files changed, 934 insertions, 539 deletions
diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c index aa952c596ee..9021a8df666 100644 --- a/usr.sbin/smtpd/queue.c +++ b/usr.sbin/smtpd/queue.c @@ -1,4 +1,4 @@ -/* $OpenBSD: queue.c,v 1.16 2008/11/25 20:26:40 gilles Exp $ */ +/* $OpenBSD: queue.c,v 1.17 2008/12/03 17:58:00 gilles Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> @@ -54,27 +54,35 @@ void queue_dispatch_lka(int, short, void *); void queue_setup_events(struct smtpd *); void queue_disable_events(struct smtpd *); void queue_timeout(int, short, void *); -int queue_create_message_file(char *); -void queue_delete_message_file(char *); -int queue_record_submission(struct message *); -int queue_remove_submission(struct message *); -struct batch *batch_lookup(struct smtpd *, struct message *); -int batch_schedule(struct batch *, time_t); -void batch_unschedule(struct batch *); -void batch_send(struct smtpd *, struct batch *, time_t); -int queue_update_database(struct message *); +void queue_process_runqueue(int, short, void *); +int queue_create_incoming_layout(char *); +int queue_record_envelope(struct message *); +int queue_remove_envelope(struct message *); int queue_open_message_file(struct batch *); int queue_batch_resolved(struct smtpd *, struct batch *); +int queue_message_schedule(struct message *, time_t); +void queue_delete_message_file(char *); +u_int16_t queue_message_hash(struct message *); +int queue_record_incoming_envelope(struct message *); +int queue_update_incoming_envelope(struct message *); +int queue_remove_incoming_envelope(struct message *); +int queue_commit_incoming_message(struct message *); +void queue_delete_incoming_message(char *); +int queue_update_envelope(struct message *); +int queue_open_incoming_message_file(struct message *); +void queue_process(struct smtpd *); +int queue_process_bucket(struct smtpd *, u_int16_t); +int queue_process_message(struct smtpd *, char *); +void queue_process_envelope(struct smtpd *, char *, char *); +int queue_load_envelope(struct message *, char *); +void queue_delete_message(char *); + +void batch_send(struct smtpd *, struct batch *, time_t); +u_int32_t hash(u_int8_t *, size_t); struct batch *queue_record_batch(struct smtpd *, struct message *); struct batch *batch_by_id(struct smtpd *, u_int64_t); +struct batch *batch_lookup(struct smtpd *, struct message *); struct message *message_by_id(struct smtpd *, struct batch *, u_int64_t); -void debug_display_batch(struct batch *); -void debug_display_message(struct message *); -void queue_load_submissions(struct smtpd *, time_t); -int queue_message_schedule(struct message *, time_t); -int queue_message_from_id(char *, struct message *); -int queue_message_complete(struct message *); -int queue_init_submissions(void); void queue_sig_handler(int sig, short event, void *p) @@ -171,28 +179,32 @@ queue_dispatch_smtp(int sig, short event, void *p) break; switch (imsg.hdr.type) { - case IMSG_QUEUE_CREATE_MESSAGE_FILE: { + case IMSG_QUEUE_CREATE_MESSAGE: { struct message *messagep; struct submit_status ss; - int fd; log_debug("mfa_dispatch_smtp: creating message file"); messagep = imsg.data; ss.id = messagep->session_id; ss.code = 250; - fd = queue_create_message_file(ss.u.msgid); - imsg_compose(ibuf, IMSG_SMTP_MESSAGE_FILE, 0, 0, fd, + bzero(ss.u.msgid, MAXPATHLEN); + + if (! queue_create_incoming_layout(ss.u.msgid)) + ss.code = 421; + + imsg_compose(ibuf, IMSG_SMTP_MESSAGE_ID, 0, 0, -1, &ss, sizeof(ss)); break; } - case IMSG_QUEUE_DELETE_MESSAGE_FILE: { + case IMSG_QUEUE_REMOVE_MESSAGE: { struct message *messagep; messagep = imsg.data; - queue_delete_message_file(messagep->message_id); + if (messagep->message_id[0] != '\0') + queue_delete_incoming_message(messagep->message_id); break; } - case IMSG_QUEUE_MESSAGE_SUBMIT: { + case IMSG_QUEUE_SUBMIT_ENVELOPE: { struct message *messagep; struct submit_status ss; @@ -209,13 +221,17 @@ queue_dispatch_smtp(int sig, short event, void *p) messagep->type = T_MTA_MESSAGE; /* Write to disk */ - queue_record_submission(messagep); + if (! queue_record_incoming_envelope(messagep)) { + ss.code = 421; + imsg_compose(ibuf, IMSG_SMTP_SUBMIT_ACK, 0, 0, -1, + &ss, sizeof(ss)); + break; + } + imsg_compose(ibuf, IMSG_SMTP_SUBMIT_ACK, 0, 0, -1, &ss, sizeof(ss)); if (messagep->type & T_MTA_MESSAGE) { - messagep->flags |= F_MESSAGE_READY; - queue_update_database(messagep); break; } @@ -235,20 +251,37 @@ queue_dispatch_smtp(int sig, short event, void *p) break; } - case IMSG_QUEUE_MESSAGE_COMPLETE: { + case IMSG_QUEUE_COMMIT_MESSAGE: { struct message *messagep; struct submit_status ss; messagep = imsg.data; ss.id = messagep->session_id; - queue_message_complete(messagep); + if (! queue_commit_incoming_message(messagep)) + ss.code = 421; imsg_compose(ibuf, IMSG_SMTP_SUBMIT_ACK, 0, 0, -1, &ss, sizeof(ss)); break; } + case IMSG_QUEUE_MESSAGE_FILE: { + struct message *messagep; + struct submit_status ss; + int fd; + + messagep = imsg.data; + ss.id = messagep->session_id; + + fd = queue_open_incoming_message_file(messagep); + if (fd == -1) + ss.code = 421; + + imsg_compose(ibuf, IMSG_SMTP_MESSAGE_FILE, 0, 0, fd, + &ss, sizeof(ss)); + break; + } default: log_debug("queue_dispatch_smtp: unexpected imsg %d", imsg.hdr.type); @@ -306,7 +339,7 @@ queue_dispatch_mda(int sig, short event, void *p) if (messagep->status & S_MESSAGE_TEMPFAILURE) { messagep->status &= ~S_MESSAGE_TEMPFAILURE; messagep->flags &= ~F_MESSAGE_PROCESSING; - queue_update_database(messagep); + queue_update_envelope(messagep); break; } @@ -319,18 +352,17 @@ queue_dispatch_mda(int sig, short event, void *p) messagep->id = queue_generate_id(); messagep->batch_id = 0; messagep->type |= T_DAEMON_MESSAGE; - messagep->flags |= F_MESSAGE_READY; messagep->lasttry = 0; messagep->retry = 0; - queue_record_submission(messagep); + queue_record_envelope(messagep); *messagep = msave; } - queue_remove_submission(messagep); + queue_remove_envelope(messagep); break; } - /* no error, remove submission */ - queue_remove_submission(messagep); + /* no error, remove envelope */ + queue_remove_envelope(messagep); break; } @@ -402,7 +434,7 @@ queue_dispatch_mta(int sig, short event, void *p) if (messagep->status & S_MESSAGE_TEMPFAILURE) { messagep->status &= ~S_MESSAGE_TEMPFAILURE; messagep->flags &= ~F_MESSAGE_PROCESSING; - queue_update_database(messagep); + queue_update_envelope(messagep); break; } @@ -415,18 +447,17 @@ queue_dispatch_mta(int sig, short event, void *p) messagep->id = queue_generate_id(); messagep->batch_id = 0; messagep->type |= T_DAEMON_MESSAGE; - messagep->flags |= F_MESSAGE_READY; messagep->lasttry = 0; messagep->retry = 0; - queue_record_submission(messagep); + queue_record_envelope(messagep); *messagep = msave; } - queue_remove_submission(messagep); + queue_remove_envelope(messagep); break; } - /* no error, remove submission */ - queue_remove_submission(messagep); + /* no error, remove envelope */ + queue_remove_envelope(messagep); break; } @@ -483,12 +514,7 @@ queue_dispatch_lka(int sig, short event, void *p) messagep = imsg.data; messagep->id = queue_generate_id(); messagep->batch_id = 0; - queue_record_submission(messagep); - - if (messagep->type & T_MTA_MESSAGE) { - messagep->flags |= F_MESSAGE_READY; - queue_update_database(messagep); - } + queue_record_incoming_envelope(messagep); if (messagep->type & T_MDA_MESSAGE) { imsg_compose(ibuf, IMSG_LKA_FORWARD_LOOKUP, 0, 0, -1, @@ -503,8 +529,7 @@ queue_dispatch_lka(int sig, short event, void *p) messagep = (struct message *)imsg.data; messagep->id = queue_generate_id(); messagep->batch_id = 0; - messagep->flags |= F_MESSAGE_READY; - queue_record_submission(messagep); + queue_record_incoming_envelope(messagep); break; } @@ -512,7 +537,7 @@ queue_dispatch_lka(int sig, short event, void *p) struct message *messagep; messagep = (struct message *)imsg.data; - queue_remove_submission(messagep); + queue_remove_incoming_envelope(messagep); break; } @@ -543,9 +568,14 @@ queue_setup_events(struct smtpd *env) struct timeval tv; evtimer_set(&env->sc_ev, queue_timeout, env); - tv.tv_sec = 1; - tv.tv_usec = 0; + tv.tv_sec = 0; + tv.tv_usec = 10; evtimer_add(&env->sc_ev, &tv); + + evtimer_set(&env->sc_rqev, queue_process_runqueue, env); + tv.tv_sec = 0; + tv.tv_usec = 10; + evtimer_add(&env->sc_rqev, &tv); } void @@ -558,12 +588,13 @@ void queue_timeout(int fd, short event, void *p) { struct smtpd *env = p; - struct batch *batchp, *nxt; struct timeval tv; time_t curtime; + struct batch *batchp, *nxt; + + queue_process(env); curtime = time(NULL); - queue_load_submissions(env, curtime); for (batchp = SPLAY_MIN(batchtree, &env->batch_queue); batchp != NULL; @@ -582,95 +613,11 @@ queue_timeout(int fd, short event, void *p) } - tv.tv_sec = 5; - tv.tv_usec = 0; + tv.tv_sec = 0; + tv.tv_usec = 10; evtimer_add(&env->sc_ev, &tv); } -void -queue_load_submissions(struct smtpd *env, time_t tm) -{ - DIR *dirp; - struct dirent *dp; - struct batch *batchp; - struct message *messagep; - struct message message; - - dirp = opendir(PATH_ENVELOPES); - if (dirp == NULL) - err(1, "opendir"); - - while ((dp = readdir(dirp)) != NULL) { - - if (dp->d_name[0] == '.') - continue; - - if (! queue_message_from_id(dp->d_name, &message)) { - warnx("failed to load message \"%s\"", dp->d_name); - continue; - } - - if (! queue_message_schedule(&message, tm)) { - if (message.flags & F_MESSAGE_EXPIRED) { - log_debug("message expired, create mdaemon"); - queue_remove_submission(&message); - } - continue; - } - - message.lasttry = tm; - message.flags |= F_MESSAGE_PROCESSING; - queue_update_database(&message); - - messagep = calloc(1, sizeof (struct message)); - if (messagep == NULL) - err(1, "calloc"); - *messagep = message; - - batchp = batch_lookup(env, messagep); - if (batchp != NULL) - messagep->batch_id = batchp->id; - - batchp = queue_record_batch(env, messagep); - if (messagep->batch_id == 0) - messagep->batch_id = batchp->id; - } - - closedir(dirp); -} - -int -queue_message_from_id(char *message_id, struct message *message) -{ - int spret; - char pathname[MAXPATHLEN]; - FILE *fp; - - spret = snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_ENVELOPES, message_id); - if (spret == -1 || spret >= MAXPATHLEN) { - warnx("queue_load_submissions: filename too long."); - return 0; - } - - fp = fopen(pathname, "r"); - if (fp == NULL) { - warnx("queue_load_submissions: fopen: %s", message_id); - goto bad; - } - - if (fread(message, sizeof(struct message), 1, fp) != 1) { - warnx("queue_load_submissions: fread: %s", message_id); - goto bad; - } - - fclose(fp); - return 1; -bad: - if (fp != NULL) - fclose(fp); - return 0; -} - pid_t queue(struct smtpd *env) { @@ -722,9 +669,6 @@ queue(struct smtpd *env) SPLAY_INIT(&env->batch_queue); - queue_init_submissions(); - queue_load_submissions(env, time(NULL)); - event_init(); signal_set(&ev_sigint, SIGINT, queue_sig_handler, env); @@ -743,145 +687,272 @@ queue(struct smtpd *env) return (0); } -u_int64_t -queue_generate_id(void) +void +queue_process(struct smtpd *env) { - u_int64_t id; - struct timeval tp; + u_int16_t cbucket = 0; + static u_int16_t lbucket = 0; + DIR *dirp; + struct dirent *dp; + const char *errstr; + static u_int8_t bucketdone = 1; - if (gettimeofday(&tp, NULL) == -1) - fatal("queue_generate_id: time"); + if (! bucketdone) { + bucketdone = queue_process_bucket(env, lbucket); + if (bucketdone) + lbucket = (lbucket + 1) % DIRHASH_BUCKETS; + return; + } - id = (u_int32_t)tp.tv_sec; - id <<= 32; - id |= (u_int32_t)tp.tv_usec; - usleep(1); + dirp = opendir(PATH_QUEUE); + if (dirp == NULL) + fatal("queue_process: opendir"); - return (id); + while ((dp = readdir(dirp)) != NULL) { + + if (strcmp(dp->d_name, ".") == 0 || + strcmp(dp->d_name, "..") == 0) + continue; + + cbucket = strtonum(dp->d_name, 0, DIRHASH_BUCKETS - 1, &errstr); + if (errstr) { + log_warn("queue_process: %s/%s is not a valid bucket", + PATH_QUEUE, dp->d_name); + continue; + } + + if (cbucket == lbucket) + break; + } + closedir(dirp); + + if (dp == NULL) { + lbucket = (lbucket + 1) % DIRHASH_BUCKETS; + return; + } + + bucketdone = queue_process_bucket(env, cbucket); + if (bucketdone) + lbucket = (lbucket + 1) % DIRHASH_BUCKETS; } int -queue_create_message_file(char *message_id) +queue_process_bucket(struct smtpd *env, u_int16_t bucket) { - int fd; - char pathname[MAXPATHLEN]; int spret; + static DIR *dirp = NULL; + struct dirent *dp; + static char *msgid = NULL; + char bucketpath[MAXPATHLEN]; + static u_int8_t messagedone = 1; - spret = snprintf(pathname, MAXPATHLEN, "%s/%d.XXXXXXXXXXXXXXXX", - PATH_MESSAGES, time(NULL)); + if (! messagedone) { + messagedone = queue_process_message(env, msgid); + if (! messagedone) + return 0; + msgid = NULL; + } + + spret = snprintf(bucketpath, MAXPATHLEN, "%s/%d", PATH_QUEUE, bucket); if (spret == -1 || spret >= MAXPATHLEN) - return -1; + fatal("queue_process_bucket: snprintf"); + + if (dirp == NULL) { + dirp = opendir(bucketpath); + if (dirp == NULL) + fatal("queue_process_bucket: opendir"); + } - fd = mkstemp(pathname); - if (fd == -1) - fatal("queue_create_message_file: mkstemp"); + while ((dp = readdir(dirp)) != NULL) { - /* XXX - this won't fail if message_id is MAXPATHLEN bytes */ - if (strlcpy(message_id, pathname + sizeof(PATH_MESSAGES), MAXPATHLEN) - >= MAXPATHLEN) - fatal("queue_create_message_file: message id too long"); + if (strcmp(dp->d_name, ".") == 0 || + strcmp(dp->d_name, "..") == 0) + continue; - return fd; + break; + } + + if (dp != NULL) { + msgid = dp->d_name; + messagedone = queue_process_message(env, msgid); + if (! messagedone) + return 0; + msgid = NULL; + } + + closedir(dirp); + dirp = NULL; + return 1; } -void -queue_delete_message_file(char *message_id) +int +queue_process_message(struct smtpd *env, char *messageid) { - char pathname[MAXPATHLEN]; int spret; + static DIR *dirp = NULL; + struct dirent *dp; + char evppath[MAXPATHLEN]; + u_int16_t hval = 0; - spret = snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_MESSAGES, message_id); + hval = hash(messageid, strlen(messageid)) % DIRHASH_BUCKETS; + + spret = snprintf(evppath, MAXPATHLEN, "%s/%d/%s%s", PATH_QUEUE, hval, + messageid, PATH_ENVELOPES); if (spret == -1 || spret >= MAXPATHLEN) - fatal("queue_delete_message_file: message id too long"); + fatal("queue_process_message: snprintf"); - if (unlink(pathname) == -1) - fatal("queue_delete_message_file: unlink"); + if (dirp == NULL) { + dirp = opendir(evppath); + if (dirp == NULL) + fatal("queue_process_message: opendir"); + } + + while ((dp = readdir(dirp)) != NULL) { + + if (strcmp(dp->d_name, ".") == 0 || + strcmp(dp->d_name, "..") == 0) + continue; + break; + } + + if (dp != NULL) { + queue_process_envelope(env, messageid, dp->d_name); + return 0; + } + + closedir(dirp); + dirp = NULL; + return 1; } -int -queue_record_submission(struct message *message) +void +queue_process_envelope(struct smtpd *env, char *msgid, char *evpid) { - char pathname[MAXPATHLEN]; - char linkname[MAXPATHLEN]; - char dbname[MAXPATHLEN]; - char message_uid[MAXPATHLEN]; - char *spool; - size_t spoolsz; - int fd; - int mode = O_CREAT|O_TRUNC|O_WRONLY|O_EXCL|O_SYNC; int spret; - FILE *fp; + struct message message; + time_t tm; + char evppath[MAXPATHLEN]; + char rqpath[MAXPATHLEN]; + u_int16_t hval; + struct stat sb; - if (message->type & T_DAEMON_MESSAGE) { - spool = PATH_DAEMON; + if (! queue_load_envelope(&message, evpid)) { + log_debug("failed to load envelope: %s", evpid); + return; } - else { - switch (message->recipient.rule.r_action) { - case A_MBOX: - case A_MAILDIR: - case A_EXT: - spool = PATH_LOCAL; - break; - default: - spool = PATH_RELAY; + + tm = time(NULL); + + if (! queue_message_schedule(&message, tm)) { + if (message.flags & F_MESSAGE_EXPIRED) { + log_debug("message has expired, mdaemon"); + queue_remove_envelope(&message); } + return; } - spoolsz = strlen(spool); - spret = snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_MESSAGES, - message->message_id); + message.flags |= F_MESSAGE_SCHEDULED; + queue_update_envelope(&message); + + log_debug("SCHEDULED: %s", evpid); + hval = hash(msgid, strlen(msgid)) % DIRHASH_BUCKETS; + spret = snprintf(evppath, MAXPATHLEN, "%s/%d/%s%s/%s", PATH_QUEUE, hval, + msgid, PATH_ENVELOPES, evpid); if (spret == -1 || spret >= MAXPATHLEN) - fatal("queue_record_submission: message id too long"); + fatal("queue_process_envelope: snprintf"); - for (;;) { - spret = snprintf(linkname, MAXPATHLEN, "%s/%s.%qu", spool, - message->message_id, (u_int64_t)arc4random()); - if (spret == -1 || spret >= MAXPATHLEN) - fatal("queue_record_submission: message uid too long"); + spret = snprintf(rqpath, MAXPATHLEN, "%s/%s", PATH_RUNQUEUE, evpid); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_process_envelope: snprintf"); - (void)strlcpy(message_uid, linkname + spoolsz + 1, MAXPATHLEN); + if (stat(rqpath, &sb) == -1) { + if (errno != ENOENT) + fatal("queue_process_envelope: stat"); - if (link(pathname, linkname) == -1) { - if (errno == EEXIST) - continue; - err(1, "link: %s , %s", pathname, linkname); + if (symlink(evppath, rqpath) == -1) { + log_info("queue_process_envelope: " + "failed to place envelope in runqueue"); } + } +} - spret = snprintf(dbname, MAXPATHLEN, "%s/%s", PATH_ENVELOPES, - message_uid); - if (spret == -1 || spret >= MAXPATHLEN) - fatal("queue_record_submission: database uid too long"); - - fd = open(dbname, mode, 0600); - if (fd == -1) - if (unlink(linkname) == -1) - fatal("queue_record_submission: unlink"); +void +queue_process_runqueue(int fd, short event, void *p) +{ + DIR *dirp; + struct dirent *dp; + struct message message; + struct message *messagep; + struct batch *batchp; + char pathname[MAXPATHLEN]; + time_t tm; + struct smtpd *env = p; + struct timeval tv; - if (flock(fd, LOCK_EX) == -1) - fatal("queue_record_submission: flock"); + tm = time(NULL); - fp = fdopen(fd, "w"); - if (fp == NULL) - fatal("fdopen"); + dirp = opendir(PATH_RUNQUEUE); + if (dirp == NULL) + fatal("queue_process_runqueue: opendir"); - if (strlcpy(message->message_uid, message_uid, MAXPATHLEN) - >= MAXPATHLEN) - fatal("queue_record_submission: message uid too long"); + while ((dp = readdir(dirp)) != NULL) { + if (strcmp(dp->d_name, ".") == 0 || + strcmp(dp->d_name, "..") == 0) + continue; - message->creation = time(NULL); + /* XXX */ + snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_RUNQUEUE, dp->d_name); + unlink(pathname); - if (fwrite(message, sizeof(struct message), 1, fp) != 1) { - fclose(fp); - unlink(dbname); - return 0; + if (! queue_load_envelope(&message, dp->d_name)) { + log_debug("failed to load envelope"); + continue; } - fflush(fp); - fsync(fd); - fclose(fp); - break; + if (message.flags & F_MESSAGE_PROCESSING) + continue; + + message.lasttry = tm; + message.flags &= ~F_MESSAGE_SCHEDULED; + message.flags |= F_MESSAGE_PROCESSING; + queue_update_envelope(&message); + + messagep = calloc(1, sizeof (struct message)); + if (messagep == NULL) + err(1, "calloc"); + *messagep = message; + + batchp = batch_lookup(env, messagep); + if (batchp != NULL) + messagep->batch_id = batchp->id; + + batchp = queue_record_batch(env, messagep); + if (messagep->batch_id == 0) + messagep->batch_id = batchp->id; } - return 1; + + closedir(dirp); + + tv.tv_sec = 0; + tv.tv_usec = 10; + evtimer_add(&env->sc_rqev, &tv); +} + +u_int64_t +queue_generate_id(void) +{ + u_int64_t id; + struct timeval tp; + + if (gettimeofday(&tp, NULL) == -1) + fatal("queue_generate_id: time"); + + id = (u_int32_t)tp.tv_sec; + id <<= 32; + id |= (u_int32_t)tp.tv_usec; + usleep(1); + + return (id); } struct batch * @@ -941,71 +1012,6 @@ queue_record_batch(struct smtpd *env, struct message *messagep) } int -queue_remove_submission(struct message *message) -{ - char pathname[MAXPATHLEN]; - char linkname[MAXPATHLEN]; - char dbname[MAXPATHLEN]; - char *spool; - struct stat sb; - int spret; - - if (message->type & T_DAEMON_MESSAGE) { - spool = PATH_DAEMON; - } - else { - switch (message->recipient.rule.r_action) { - case A_MBOX: - case A_MAILDIR: - case A_EXT: - spool = PATH_LOCAL; - break; - default: - spool = PATH_RELAY; - } - } - - spret = snprintf(dbname, MAXPATHLEN, "%s/%s", PATH_ENVELOPES, - message->message_uid); - if (spret == -1 || spret >= MAXPATHLEN) - fatal("queue_remove_submission: database uid too long"); - - spret = snprintf(linkname, MAXPATHLEN, "%s/%s", spool, - message->message_uid); - if (spret == -1 || spret >= MAXPATHLEN) - fatal("queue_remove_submission: message uid too long"); - - spret = snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_MESSAGES, - message->message_id); - if (spret == -1 || spret >= MAXPATHLEN) - fatal("queue_remove_submission: message id too long"); - - if (unlink(dbname) == -1) { - warnx("dbname: %s", dbname); - fatal("queue_remove_submission: unlink"); - } - - if (unlink(linkname) == -1) { - warnx("linkname: %s", linkname); - fatal("queue_remove_submission: unlink"); - } - - if (stat(pathname, &sb) == -1) { - warnx("pathname: %s", pathname); - fatal("queue_remove_submission: stat"); - } - - if (sb.st_nlink == 1) { - if (unlink(pathname) == -1) { - warnx("pathname: %s", pathname); - fatal("queue_remove_submission: unlink"); - } - } - - return 1; -} - -int queue_remove_batch_message(struct smtpd *env, struct batch *batchp, struct message *messagep) { TAILQ_REMOVE(&batchp->messages, messagep, entry); @@ -1082,75 +1088,6 @@ queue_batch_resolved(struct smtpd *env, struct batch *lookup) return 1; } -int -queue_open_message_file(struct batch *batch) -{ - int fd; - char pathname[MAXPATHLEN]; - int spret; - - spret = snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_MESSAGES, - batch->message_id); - if (spret == -1 || spret >= MAXPATHLEN) - fatal("queue_open_message_file: message id too long"); - - fd = open(pathname, O_RDONLY); - if (fd == -1) - fatal("queue_open_message_file: open"); - - return fd; -} - -int -queue_update_database(struct message *message) -{ - int fd; - char *spool; - char pathname[MAXPATHLEN]; - int spret; - FILE *fp; - mode_t mode = O_RDWR; - - if (message->type & T_DAEMON_MESSAGE) { - spool = PATH_DAEMON; - } - else { - switch (message->recipient.rule.r_action) { - case A_MBOX: - case A_MAILDIR: - case A_EXT: - spool = PATH_LOCAL; - break; - default: - spool = PATH_RELAY; - } - } - - spret = snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_ENVELOPES, - message->message_uid); - if (spret == -1 || spret >= MAXPATHLEN) - fatal("queue_update_database: pathname too long"); - - if ((fd = open(pathname, mode)) == -1) - fatal("queue_update_database: cannot open database"); - - - if (flock(fd, LOCK_EX) == -1) - fatal("queue_update_database: flock"); - - fp = fdopen(fd, "w"); - if (fp == NULL) - fatal("fdopen"); - - if (fwrite(message, sizeof(struct message), 1, fp) != 1) - fatal("queue_update_database: cannot write database"); - fflush(fp); - fsync(fd); - fclose(fp); - - return 1; -} - struct batch * batch_lookup(struct smtpd *env, struct message *message) { @@ -1201,98 +1138,6 @@ batch_cmp(struct batch *s1, struct batch *s2) } int -queue_init_submissions(void) -{ - DIR *dirp; - struct dirent *dp; - struct message message; - char pathname[MAXPATHLEN]; - FILE *fp; - int spret; - - dirp = opendir(PATH_ENVELOPES); - if (dirp == NULL) - err(1, "opendir"); - - while ((dp = readdir(dirp)) != NULL) { - - if (dp->d_name[0] == '.') - continue; - - spret = snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_ENVELOPES, - dp->d_name); - if (spret == -1 || spret >= MAXPATHLEN) - continue; - - fp = fopen(pathname, "r"); - if (fp == NULL) - continue; - - if (fread(&message, sizeof(struct message), 1, fp) != 1) { - fclose(fp); - continue; - } - fclose(fp); - - if ((message.flags & F_MESSAGE_COMPLETE) == 0) - unlink(pathname); - else { - message.flags &= ~F_MESSAGE_PROCESSING; - queue_update_database(&message); - } - } - - closedir(dirp); - return 1; -} - -int -queue_message_complete(struct message *messagep) -{ - DIR *dirp; - struct dirent *dp; - struct message message; - char pathname[MAXPATHLEN]; - FILE *fp; - int spret; - - dirp = opendir(PATH_ENVELOPES); - if (dirp == NULL) - err(1, "opendir"); - - while ((dp = readdir(dirp)) != NULL) { - - if (dp->d_name[0] == '.') - continue; - - if (strncmp(messagep->message_id, - dp->d_name, strlen(messagep->message_id)) != 0) - continue; - - spret = snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_ENVELOPES, - dp->d_name); - if (spret == -1 || spret >= MAXPATHLEN) - continue; - - fp = fopen(pathname, "r"); - if (fp == NULL) - continue; - - if (fread(&message, sizeof(struct message), 1, fp) != 1) { - fclose(fp); - continue; - } - fclose(fp); - - message.flags |= F_MESSAGE_COMPLETE; - queue_update_database(&message); - } - - closedir(dirp); - return 1; -} - -int queue_message_schedule(struct message *messagep, time_t tm) { time_t delay; @@ -1307,11 +1152,8 @@ queue_message_schedule(struct message *messagep, time_t tm) messagep->flags |= F_MESSAGE_EXPIRED; return 0; } - - if ((messagep->flags & F_MESSAGE_READY) == 0) - return 0; - - if ((messagep->flags & F_MESSAGE_COMPLETE) == 0) + + if ((messagep->flags & F_MESSAGE_SCHEDULED) != 0) return 0; if ((messagep->flags & F_MESSAGE_PROCESSING) != 0) @@ -1347,12 +1189,6 @@ queue_message_schedule(struct message *messagep, time_t tm) } void -batch_unschedule(struct batch *batchp) -{ - batchp->flags &= ~(F_BATCH_SCHEDULED); -} - -void batch_send(struct smtpd *env, struct batch *batchp, time_t curtime) { u_int8_t proctype; @@ -1390,6 +1226,7 @@ batch_by_id(struct smtpd *env, u_int64_t id) return SPLAY_FIND(batchtree, &env->batch_queue, &lookup); } + struct message * message_by_id(struct smtpd *env, struct batch *batchp, u_int64_t id) { @@ -1412,4 +1249,520 @@ message_by_id(struct smtpd *env, struct batch *batchp, u_int64_t id) return NULL; } +int +queue_create_incoming_layout(char *message_id) +{ + int spret; + char rootdir[MAXPATHLEN]; + char evpdir[MAXPATHLEN]; + + spret = snprintf(rootdir, MAXPATHLEN, "%s/%d.XXXXXXXXXXXXXXXX", + PATH_INCOMING, time(NULL)); + if (spret == -1 || spret >= MAXPATHLEN) + return -1; + + if (mkdtemp(rootdir) == NULL) + return -1; + + if (strlcpy(message_id, rootdir + strlen(PATH_INCOMING) + 1, MAXPATHLEN) + >= MAXPATHLEN) + goto badroot; + + spret = snprintf(evpdir, MAXPATHLEN, "%s%s", + rootdir, PATH_ENVELOPES); + if (spret == -1 || spret >= MAXPATHLEN) + goto badroot; + + if (mkdir(evpdir, 0700) == -1) + goto badroot; + + return 1; + +badroot: + if (rmdir(rootdir) == -1) + fatal("queue_create_incoming_layout: rmdir"); + + return 0; +} + +void +queue_delete_incoming_message(char *message_id) +{ + int spret; + char rootdir[MAXPATHLEN]; + char evpdir[MAXPATHLEN]; + char evppath[MAXPATHLEN]; + char msgpath[MAXPATHLEN]; + DIR *dirp; + struct dirent *dp; + + spret = snprintf(rootdir, MAXPATHLEN, "%s/%s", PATH_INCOMING, + message_id); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_delete_incoming_message: snprintf"); + + spret = snprintf(evpdir, MAXPATHLEN, "%s%s", + rootdir, PATH_ENVELOPES); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_delete_incoming_message: snprintf"); + + spret = snprintf(msgpath, MAXPATHLEN, "%s/message", rootdir); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_delete_incoming_message: snprintf"); + + if (unlink(msgpath) == -1) { + if (errno != ENOENT) + fatal("queue_delete_incoming_message: unlink"); + } + + log_debug("evpdir: %s", evpdir); + dirp = opendir(evpdir); + if (dirp == NULL) + fatal("queue_delete_incoming_message: opendir"); + while ((dp = readdir(dirp)) != NULL) { + if (strcmp(dp->d_name, ".") == 0 || + strcmp(dp->d_name, "..") == 0) + continue; + spret = snprintf(evppath, MAXPATHLEN, "%s/%s", evpdir, dp->d_name); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_create_incoming_message: snprintf"); + + if (unlink(evppath) == -1) { + if (errno != ENOENT) + fatal("queue_create_incoming_message: unlink"); + } + } + closedir(dirp); + + if (rmdir(evpdir) == -1) + if (errno != ENOENT) + fatal("queue_create_incoming_message: rmdir"); + + if (rmdir(rootdir) == -1) + if (errno != ENOENT) + fatal("queue_create_incoming_message: rmdir"); + + return; +} + +int +queue_record_incoming_envelope(struct message *message) +{ + char rootdir[MAXPATHLEN]; + char evpdir[MAXPATHLEN]; + char evpname[MAXPATHLEN]; + char message_uid[MAXPATHLEN]; + int fd; + int mode = O_CREAT|O_TRUNC|O_WRONLY|O_EXCL|O_SYNC; + int spret; + FILE *fp; + int ret; + + spret = snprintf(rootdir, MAXPATHLEN, "%s/%s", PATH_INCOMING, + message->message_id); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_record_incoming_envelope: snprintf"); + + spret = snprintf(evpdir, MAXPATHLEN, "%s%s", rootdir, PATH_ENVELOPES); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_record_incoming_envelope: snprintf"); + + for (;;) { + spret = snprintf(evpname, MAXPATHLEN, "%s/%s.%qu", evpdir, + message->message_id, (u_int64_t)arc4random()); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_record_incoming_envelope: snprintf"); + + (void)strlcpy(message_uid, evpname + strlen(evpdir) + 1, MAXPATHLEN); + + fd = open(evpname, mode, 0600); + if (fd == -1) { + if (errno == EEXIST) + continue; + return 0; + } + + if (flock(fd, LOCK_EX) == -1) + fatal("queue_record_submission: flock"); + + fp = fdopen(fd, "w"); + if (fp == NULL) + fatal("fdopen"); + + if (strlcpy(message->message_uid, message_uid, MAXPATHLEN) + >= MAXPATHLEN) + fatal("queue_record_submission: strlcpy"); + + message->creation = time(NULL); + + if ((ret = fwrite(message, sizeof (struct message), 1, fp)) != 1) { + fclose(fp); + unlink(evpname); + return 0; + } + fflush(fp); + fsync(fd); + fclose(fp); + + break; + } + return 1; +} + +int +queue_update_incoming_envelope(struct message *messagep) +{ + int fd; + char pathname[MAXPATHLEN]; + int spret; + FILE *fp; + mode_t mode = O_RDWR; + + spret = snprintf(pathname, MAXPATHLEN, "%s/%s%s/%s", PATH_INCOMING, + messagep->message_id, PATH_ENVELOPES, messagep->message_uid); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_update_incoming_envelope: snprintf"); + + if ((fd = open(pathname, mode)) == -1) + fatal("queue_update_incoming_envelope: open"); + + if (flock(fd, LOCK_EX) == -1) + fatal("queue_update_incoming_envelope: flock"); + + fp = fdopen(fd, "w"); + if (fp == NULL) + fatal("queue_update_incoming_envelope: fdopen"); + + if (fwrite(messagep, sizeof(struct message), 1, fp) != 1) + fatal("queue_update_incoming_envelope: fwrite"); + fflush(fp); + fsync(fd); + fclose(fp); + + return 1; +} + +int +queue_remove_incoming_envelope(struct message *message) +{ + char pathname[MAXPATHLEN]; + int spret; + + spret = snprintf(pathname, MAXPATHLEN, "%s/%s%s/%s", PATH_INCOMING, + message->message_id, PATH_ENVELOPES, message->message_uid); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_remove_incoming_envelope: snprintf"); + + if (unlink(pathname) == -1) + if (errno != ENOENT) + fatal("queue_remove_incoming_envelope: unlink"); + + return 1; +} + +int +queue_commit_incoming_message(struct message *messagep) +{ + int spret; + char rootdir[MAXPATHLEN]; + char queuedir[MAXPATHLEN]; + u_int16_t hval; + + spret = snprintf(rootdir, MAXPATHLEN, "%s/%s", PATH_INCOMING, + messagep->message_id); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_commit_message_incoming: snprintf"); + + hval = queue_message_hash(messagep); + + spret = snprintf(queuedir, MAXPATHLEN, "%s/%d", PATH_QUEUE, hval); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_commit_message_incoming: snprintf"); + + if (mkdir(queuedir, 0700) == -1) { + if (errno == ENOSPC) + return 0; + if (errno != EEXIST) + fatal("queue_commit_message_incoming: mkdir"); + } + + spret = snprintf(queuedir, MAXPATHLEN, "%s/%d/%s", PATH_QUEUE, hval, + messagep->message_id); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_commit_message_incoming: snprintf"); + + + if (rename(rootdir, queuedir) == -1) + fatal("queue_commit_message_incoming: rename"); + + return 1; +} + +int +queue_open_incoming_message_file(struct message *messagep) +{ + char pathname[MAXPATHLEN]; + int spret; + mode_t mode = O_CREAT|O_EXCL|O_RDWR; + + spret = snprintf(pathname, MAXPATHLEN, "%s/%s/message", PATH_INCOMING, + messagep->message_id); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_open_incoming_message_file: snprintf"); + + return open(pathname, mode, 0600); +} + +int +queue_record_envelope(struct message *messagep) +{ + char queuedir[MAXPATHLEN]; + char evpdir[MAXPATHLEN]; + char evpname[MAXPATHLEN]; + char message_uid[MAXPATHLEN]; + int fd; + int mode = O_CREAT|O_TRUNC|O_WRONLY|O_EXCL|O_SYNC; + int spret; + FILE *fp; + int ret; + u_int16_t hval; + + spret = snprintf(queuedir, MAXPATHLEN, "%s/%s", PATH_QUEUE, + messagep->message_id); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_record_envelope: snprintf"); + + hval = queue_message_hash(messagep); + + spret = snprintf(queuedir, MAXPATHLEN, "%s/%d", PATH_QUEUE, hval); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_record_envelope: snprintf"); + + spret = snprintf(evpdir, MAXPATHLEN, "%s/%s%s", queuedir, messagep->message_id, + PATH_ENVELOPES); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_record_envelope: snprintf"); + + for (;;) { + spret = snprintf(evpname, MAXPATHLEN, "%s/%s.%qu", evpdir, + messagep->message_id, (u_int64_t)arc4random()); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_record_envelope: snprintf"); + + (void)strlcpy(message_uid, evpname + strlen(evpdir) + 1, MAXPATHLEN); + + fd = open(evpname, mode, 0600); + if (fd == -1) { + if (errno == EEXIST) + continue; + log_debug("failed to open %s", evpname); + fatal("queue_record_envelope: open"); + } + + if (flock(fd, LOCK_EX) == -1) + fatal("queue_record_envelope: flock"); + + fp = fdopen(fd, "w"); + if (fp == NULL) + fatal("fdopen"); + + if (strlcpy(messagep->message_uid, message_uid, MAXPATHLEN) + >= MAXPATHLEN) + fatal("queue_record_envelope: strlcpy"); + + messagep->creation = time(NULL); + + if ((ret = fwrite(messagep, sizeof (struct message), 1, fp)) != 1) { + fclose(fp); + unlink(evpname); + return 0; + } + fflush(fp); + fsync(fd); + fclose(fp); + + break; + } + return 1; + +} + +int +queue_remove_envelope(struct message *messagep) +{ + char pathname[MAXPATHLEN]; + u_int16_t hval; + int spret; + + hval = queue_message_hash(messagep); + + spret = snprintf(pathname, MAXPATHLEN, "%s/%d/%s%s/%s", PATH_QUEUE, + hval, messagep->message_id, PATH_ENVELOPES, messagep->message_uid); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_remove_incoming_envelope: snprintf"); + + if (unlink(pathname) == -1) + fatal("queue_remove_incoming_envelope: unlink"); + + spret = snprintf(pathname, MAXPATHLEN, "%s/%d/%s%s", PATH_QUEUE, + hval, messagep->message_id, PATH_ENVELOPES); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_remove_incoming_envelope: snprintf"); + + if (rmdir(pathname) != -1) + queue_delete_message(messagep->message_id); + + return 1; +} + +int +queue_update_envelope(struct message *messagep) +{ + int fd; + char pathname[MAXPATHLEN]; + int spret; + FILE *fp; + mode_t mode = O_RDWR; + u_int16_t hval; + + hval = queue_message_hash(messagep); + + spret = snprintf(pathname, MAXPATHLEN, "%s/%d/%s%s/%s", PATH_QUEUE, + hval, messagep->message_id, PATH_ENVELOPES, messagep->message_uid); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_update_envelope: snprintf"); + + if ((fd = open(pathname, mode)) == -1) + fatal("queue_update_envelope: open"); + + if (flock(fd, LOCK_EX) == -1) + fatal("queue_update_envelope: flock"); + + fp = fdopen(fd, "w"); + if (fp == NULL) + fatal("queue_update_envelope: fdopen"); + + if (fwrite(messagep, sizeof(struct message), 1, fp) != 1) + fatal("queue_update_envelope: fwrite"); + fflush(fp); + fsync(fd); + fclose(fp); + + return 1; +} + +int +queue_load_envelope(struct message *messagep, char *evpid) +{ + int spret; + char pathname[MAXPATHLEN]; + u_int16_t hval; + FILE *fp; + char msgid[MAXPATHLEN]; + + strlcpy(msgid, evpid, MAXPATHLEN); + *strrchr(msgid, '.') = '\0'; + + hval = hash(msgid, strlen(msgid)) % DIRHASH_BUCKETS; + spret = snprintf(pathname, MAXPATHLEN, "%s/%d/%s%s/%s", PATH_QUEUE, + hval, msgid, PATH_ENVELOPES, evpid); + + fp = fopen(pathname, "r"); + if (fp == NULL) + return 0; + + if (fread(messagep, sizeof(struct message), 1, fp) != 1) + fatal("queue_load_envelope: fread"); + + fclose(fp); + + return 1; +} + +int +queue_open_message_file(struct batch *batchp) +{ + int fd; + char pathname[MAXPATHLEN]; + int spret; + mode_t mode = O_RDONLY; + u_int16_t hval; + + hval = hash(batchp->message_id, strlen(batchp->message_id)) % DIRHASH_BUCKETS; + + spret = snprintf(pathname, MAXPATHLEN, "%s/%d/%s/message", PATH_QUEUE, + hval, batchp->message_id); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_open_message_file: snprintf"); + + if ((fd = open(pathname, mode)) == -1) + fatal("queue_open_message_file: open"); + + return fd; +} + +void +queue_delete_message(char *msgid) +{ + int spret; + char rootdir[MAXPATHLEN]; + char evpdir[MAXPATHLEN]; + char msgpath[MAXPATHLEN]; + u_int16_t hval; + + hval = hash(msgid, strlen(msgid)) % DIRHASH_BUCKETS; + spret = snprintf(rootdir, MAXPATHLEN, "%s/%d/%s", PATH_QUEUE, + hval, msgid); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_delete_message: snprintf"); + + spret = snprintf(evpdir, MAXPATHLEN, "%s%s", + rootdir, PATH_ENVELOPES); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_delete_message: snprintf"); + + spret = snprintf(msgpath, MAXPATHLEN, "%s/message", rootdir); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_delete_message: snprintf"); + + if (unlink(msgpath) == -1) + if (errno != ENOENT) + fatal("queue_delete_message: unlink"); + + if (rmdir(evpdir) == -1) + if (errno != ENOENT) + fatal("queue_delete_message: rmdir"); + + if (rmdir(rootdir) == -1) + if (errno != ENOENT) + fatal("queue_delete_message: rmdir"); + + spret = snprintf(rootdir, MAXPATHLEN, "%s/%d", PATH_QUEUE, + hval); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_delete_message: snprintf"); + + rmdir(rootdir); + + return; +} + +u_int16_t +queue_message_hash(struct message *messagep) +{ + return hash(messagep->message_id, strlen(messagep->message_id)) + % DIRHASH_BUCKETS; +} + +u_int32_t +hash(u_int8_t *buf, size_t len) +{ + u_int32_t h; + + for (h = 5381; len; len--) + h = ((h << 5) + h) + *buf++; + + return h; +} + SPLAY_GENERATE(batchtree, batch, b_nodes, batch_cmp); diff --git a/usr.sbin/smtpd/smtp.c b/usr.sbin/smtpd/smtp.c index 7e456a84508..a85baa45256 100644 --- a/usr.sbin/smtpd/smtp.c +++ b/usr.sbin/smtpd/smtp.c @@ -1,4 +1,4 @@ -/* $OpenBSD: smtp.c,v 1.7 2008/11/24 22:30:19 gilles Exp $ */ +/* $OpenBSD: smtp.c,v 1.8 2008/12/03 17:58:00 gilles Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> @@ -358,11 +358,10 @@ smtp_dispatch_queue(int sig, short event, void *p) break; switch (imsg.hdr.type) { - case IMSG_SMTP_MESSAGE_FILE: { + case IMSG_SMTP_MESSAGE_ID: { struct submit_status *ss; struct session *s; struct session key; - int fd; log_debug("smtp_dispatch_queue: queue handled message creation"); ss = imsg.data; @@ -377,6 +376,25 @@ smtp_dispatch_queue(int sig, short event, void *p) (void)strlcpy(s->s_msg.message_id, ss->u.msgid, sizeof(s->s_msg.message_id)); + session_pickup(s, ss); + break; + } + case IMSG_SMTP_MESSAGE_FILE: { + struct submit_status *ss; + struct session *s; + struct session key; + int fd; + + log_debug("smtp_dispatch_queue: queue handled message creation"); + ss = imsg.data; + + key.s_id = ss->id; + + s = SPLAY_FIND(sessiontree, &env->sc_sessions, &key); + if (s == NULL) { + /* Session was removed while we were waiting for the message */ + break; + } fd = imsg_get_fd(ibuf, &imsg); if (fd != -1) { diff --git a/usr.sbin/smtpd/smtp_session.c b/usr.sbin/smtpd/smtp_session.c index c9ba7ed8815..96294317722 100644 --- a/usr.sbin/smtpd/smtp_session.c +++ b/usr.sbin/smtpd/smtp_session.c @@ -1,4 +1,4 @@ -/* $OpenBSD: smtp_session.c,v 1.13 2008/11/25 20:35:54 gilles Exp $ */ +/* $OpenBSD: smtp_session.c,v 1.14 2008/12/03 17:58:00 gilles Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> @@ -403,7 +403,6 @@ session_rfc5321_rcpt_handler(struct session *s, char *args) } mr.id = s->s_msg.id; - s->s_state = S_RCPTREQUEST; mr.ss = s->s_ss; @@ -446,8 +445,10 @@ session_rfc5321_data_handler(struct session *s, char *args) } s->s_state = S_DATA; - session_pickup(s, NULL); - + imsg_compose(s->s_env->sc_ibufs[PROC_QUEUE], + IMSG_QUEUE_MESSAGE_FILE, 0, 0, -1, &s->s_msg, + sizeof(s->s_msg)); + bufferevent_disable(s->s_bev, EV_READ); return 1; } @@ -496,6 +497,8 @@ session_command(struct session *s, char *cmd, char *args) { int i; + bufferevent_enable(s->s_bev, EV_WRITE); + if (!(s->s_flags & F_EHLO)) goto rfc5321; @@ -548,8 +551,10 @@ session_pickup(struct session *s, struct submit_status *ss) if (s == NULL) fatal("session_pickup: desynchronized"); - bufferevent_disable(s->s_bev, EV_READ); - bufferevent_enable(s->s_bev, EV_WRITE); + bufferevent_enable(s->s_bev, EV_READ|EV_WRITE); + + if (ss != NULL && ss->code == 421) + goto tempfail; switch (s->s_state) { case S_INIT: @@ -564,9 +569,9 @@ session_pickup(struct session *s, struct submit_status *ss) break; case S_TLS: + bufferevent_disable(s->s_bev, EV_READ|EV_WRITE); s->s_state = S_GREETED; ssl_session_init(s); - bufferevent_disable(s->s_bev, EV_READ|EV_WRITE); break; case S_AUTH: @@ -592,24 +597,24 @@ session_pickup(struct session *s, struct submit_status *ss) s->s_state = S_MAIL; s->s_msg.sender = ss->u.path; + if (s->s_msg.datafp != NULL) { + fclose(s->s_msg.datafp); + s->s_msg.datafp = NULL; + imsg_compose(s->s_env->sc_ibufs[PROC_QUEUE], IMSG_QUEUE_REMOVE_MESSAGE, + 0, 0, -1, &s->s_msg, sizeof(s->s_msg)); + } + imsg_compose(s->s_env->sc_ibufs[PROC_QUEUE], - IMSG_QUEUE_CREATE_MESSAGE_FILE, 0, 0, -1, &s->s_msg, + IMSG_QUEUE_CREATE_MESSAGE, 0, 0, -1, &s->s_msg, sizeof(s->s_msg)); - + bufferevent_disable(s->s_bev, EV_READ); break; case S_MAIL: - evbuffer_add_printf(s->s_bev->output, - "%d Sender ok\r\n", ss->code); - strlcpy(s->s_msg.message_id, ss->u.msgid, MAXPATHLEN); + evbuffer_add_printf(s->s_bev->output, "%d Sender ok\r\n", + ss->code); - if (s->s_msg.datafp == NULL) { - /* Remove message file */ - imsg_compose(s->s_env->sc_ibufs[PROC_QUEUE], IMSG_QUEUE_DELETE_MESSAGE_FILE, - 0, 0, -1, &s->s_msg, sizeof(s->s_msg)); - return; - } break; case S_RCPTREQUEST: @@ -629,22 +634,20 @@ session_pickup(struct session *s, struct submit_status *ss) s->s_msg.rcptcount++; s->s_msg.recipient = ss->u.path; imsg_compose(s->s_env->sc_ibufs[PROC_QUEUE], - IMSG_QUEUE_MESSAGE_SUBMIT, 0, 0, -1, &s->s_msg, + IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, &s->s_msg, sizeof(s->s_msg)); - + bufferevent_disable(s->s_bev, EV_READ); break; case S_RCPT: - evbuffer_add_printf(s->s_bev->output, - "%d Recipient ok\r\n", ss->code); + evbuffer_add_printf(s->s_bev->output, "%d Recipient ok\r\n", + ss->code); break; case S_DATA: - if (s->s_msg.datafp == NULL) { - evbuffer_add_printf(s->s_bev->output, - "421 Service temporarily unavailable\r\n"); - return; - } + if (s->s_msg.datafp == NULL) + goto tempfail; + s->s_state = S_DATACONTENT; evbuffer_add_printf(s->s_bev->output, "354 Enter mail, end with \".\" on a line by itself\r\n"); @@ -655,6 +658,7 @@ session_pickup(struct session *s, struct submit_status *ss) case S_DONE: s->s_state = S_HELO; + s->s_msg.datafp = NULL; evbuffer_add_printf(s->s_bev->output, "250 %s Message accepted for delivery\r\n", @@ -667,6 +671,14 @@ session_pickup(struct session *s, struct submit_status *ss) fatal("session_pickup: unknown state"); break; } + + return; + +tempfail: + s->s_flags |= F_QUIT; + evbuffer_add_printf(s->s_bev->output, + "421 Service temporarily unavailable\r\n"); + return; } void @@ -709,8 +721,6 @@ read: s->s_tm = time(NULL); line = evbuffer_readline(bev->input); if (line == NULL) { - bufferevent_disable(s->s_bev, EV_READ); - bufferevent_enable(s->s_bev, EV_WRITE); return; } @@ -726,9 +736,8 @@ read: s->s_msg.datafp = NULL; /* Remove message file */ - imsg_compose(s->s_env->sc_ibufs[PROC_QUEUE], IMSG_QUEUE_DELETE_MESSAGE_FILE, + imsg_compose(s->s_env->sc_ibufs[PROC_QUEUE], IMSG_QUEUE_REMOVE_MESSAGE, 0, 0, -1, &s->s_msg, sizeof(s->s_msg)); - bufferevent_enable(s->s_bev, EV_WRITE); free(line); return; } @@ -749,7 +758,8 @@ read: for (i = 0; i < len; ++i) { if (line[i] & 0x80) { s->s_msg.status |= S_MESSAGE_PERMFAILURE; - strlcpy(s->s_msg.session_errorline, "8BIT data transfered over 7BIT limited channel", + strlcpy(s->s_msg.session_errorline, + "8BIT data transfered over 7BIT limited channel", sizeof s->s_msg.session_errorline); } } @@ -758,8 +768,6 @@ read: } goto read; } - bufferevent_disable(s->s_bev, EV_READ); - bufferevent_enable(s->s_bev, EV_WRITE); line[strcspn(line, "\r")] = '\0'; if ((ep = strchr(line, ':')) == NULL) @@ -783,20 +791,14 @@ session_write(struct bufferevent *bev, void *p) struct session *s = p; if (!(s->s_flags & F_QUIT)) { - if (! EVBUFFER_LENGTH(EVBUFFER_OUTPUT(bev))) { - bufferevent_disable(s->s_bev, EV_WRITE); - bufferevent_enable(s->s_bev, EV_READ); - } - + if (s->s_state == S_TLS) session_pickup(s, NULL); return; } - if (! EVBUFFER_LENGTH(EVBUFFER_OUTPUT(bev))) { - session_destroy(s); - } + session_destroy(s); } void @@ -811,13 +813,14 @@ session_destroy(struct session *s) if (s->s_msg.datafp != NULL) { fclose(s->s_msg.datafp); s->s_msg.datafp = NULL; - /* Remove message file */ - imsg_compose(s->s_env->sc_ibufs[PROC_QUEUE], IMSG_QUEUE_DELETE_MESSAGE_FILE, + } + + if (s->s_state > S_MAIL) { + imsg_compose(s->s_env->sc_ibufs[PROC_QUEUE], IMSG_QUEUE_REMOVE_MESSAGE, 0, 0, -1, &s->s_msg, sizeof(s->s_msg)); } if (s->s_bev != NULL) { - bufferevent_disable(s->s_bev, EV_READ|EV_WRITE); bufferevent_free(s->s_bev); } ssl_session_destroy(s); @@ -839,7 +842,7 @@ void session_msg_submit(struct session *s) { imsg_compose(s->s_env->sc_ibufs[PROC_QUEUE], - IMSG_QUEUE_MESSAGE_COMPLETE, 0, 0, -1, &s->s_msg, + IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &s->s_msg, sizeof(s->s_msg)); s->s_state = S_DONE; } diff --git a/usr.sbin/smtpd/smtpd.c b/usr.sbin/smtpd/smtpd.c index 335bb1e2ac7..1dd188dd22e 100644 --- a/usr.sbin/smtpd/smtpd.c +++ b/usr.sbin/smtpd/smtpd.c @@ -1,4 +1,4 @@ -/* $OpenBSD: smtpd.c,v 1.11 2008/11/22 22:22:05 gilles Exp $ */ +/* $OpenBSD: smtpd.c,v 1.12 2008/12/03 17:58:00 gilles Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> @@ -59,6 +59,7 @@ int parent_maildir_init(struct passwd *, char *); int parent_external_mda(struct batch *, struct path *); int check_child(pid_t, const char *); int setup_spool(uid_t, gid_t); +u_int16_t queue_message_hash(struct message *); pid_t lka_pid = 0; pid_t mfa_pid = 0; @@ -622,8 +623,9 @@ int setup_spool(uid_t uid, gid_t gid) { unsigned int n; - char *paths[] = { PATH_MESSAGES, PATH_LOCAL, PATH_RELAY, - PATH_DAEMON, PATH_ENVELOPES }; + char *paths[] = { PATH_INCOMING, PATH_QUEUE, + PATH_RUNQUEUE, PATH_RUNQUEUELOW, + PATH_RUNQUEUEHIGH }; char pathname[MAXPATHLEN]; struct stat sb; int ret; @@ -751,9 +753,14 @@ parent_open_message_file(struct batch *batchp) int fd; char pathname[MAXPATHLEN]; int spret; + u_int16_t hval; + struct message *messagep; - spret = snprintf(pathname, MAXPATHLEN, "%s%s/%s", - PATH_SPOOL, PATH_MESSAGES, batchp->message_id); + messagep = &batchp->message; + hval = queue_message_hash(messagep); + + spret = snprintf(pathname, MAXPATHLEN, "%s%s/%d/%s/message", + PATH_SPOOL, PATH_QUEUE, hval, batchp->message_id); if (spret == -1 || spret >= MAXPATHLEN) { batchp->message.status |= S_MESSAGE_PERMFAILURE; return -1; diff --git a/usr.sbin/smtpd/smtpd.h b/usr.sbin/smtpd/smtpd.h index a20582e0a25..a4c9b296431 100644 --- a/usr.sbin/smtpd/smtpd.h +++ b/usr.sbin/smtpd/smtpd.h @@ -1,4 +1,4 @@ -/* $OpenBSD: smtpd.h,v 1.16 2008/11/25 23:06:15 gilles Exp $ */ +/* $OpenBSD: smtpd.h,v 1.17 2008/12/03 17:58:00 gilles Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> @@ -43,14 +43,19 @@ #define RCPTBUFSZ 256 +#define DIRHASH_BUCKETS 4096 + #define PATH_SPOOL "/var/spool/smtpd" -#define PATH_MESSAGES "/messages" -#define PATH_LOCAL "/local" -#define PATH_RELAY "/relay" -#define PATH_DAEMON "/daemon" +#define PATH_INCOMING "/incoming" +#define PATH_QUEUE "/queue" +#define PATH_MESSAGE "/message" #define PATH_ENVELOPES "/envelopes" +#define PATH_RUNQUEUE "/runqueue" +#define PATH_RUNQUEUEHIGH "/runqueue-high" +#define PATH_RUNQUEUELOW "/runqueue-low" + /* used by newaliases */ #define PATH_ALIASES "/etc/mail/aliases" #define PATH_ALIASESDB "/etc/mail/aliases.db" @@ -173,6 +178,12 @@ enum imsg_type { IMSG_MFA_DATA_SUBMIT, IMSG_MFA_LOOKUP_MAIL, IMSG_MFA_LOOKUP_RCPT, + + IMSG_QUEUE_CREATE_MESSAGE, + IMSG_QUEUE_SUBMIT_ENVELOPE, + IMSG_QUEUE_REMOVE_MESSAGE, + IMSG_QUEUE_COMMIT_MESSAGE, + IMSG_QUEUE_REMOVE_SUBMISSION, IMSG_QUEUE_CREATE_MESSAGE_FILE, IMSG_QUEUE_DELETE_MESSAGE_FILE, @@ -183,6 +194,7 @@ enum imsg_type { IMSG_QUEUE_BATCH_COMPLETE, IMSG_QUEUE_BATCH_CLOSE, IMSG_QUEUE_MESSAGE_FD, + IMSG_QUEUE_MESSAGE_FILE, IMSG_QUEUE_ACCEPTED_CLOSE, IMSG_QUEUE_RETRY_CLOSE, @@ -195,6 +207,8 @@ enum imsg_type { IMSG_BATCH_APPEND, IMSG_BATCH_CLOSE, + IMSG_SMTP_MESSAGE_ID, + IMSG_SMTP_MESSAGE_FILE, IMSG_SMTP_SUBMIT_ACK, IMSG_SMTP_HOSTNAME_ANSWER, @@ -414,11 +428,10 @@ enum message_status { }; enum message_flags { - F_MESSAGE_COMPLETE = 0x1, - F_MESSAGE_RESOLVED = 0x2, - F_MESSAGE_READY = 0x4, - F_MESSAGE_EXPIRED = 0x8, - F_MESSAGE_PROCESSING = 0x10 + F_MESSAGE_RESOLVED = 0x1, + F_MESSAGE_EXPIRED = 0x2, + F_MESSAGE_SCHEDULED = 0x4, + F_MESSAGE_PROCESSING = 0x8 }; struct message { @@ -609,6 +622,7 @@ struct smtpd { u_int32_t sc_flags; struct timeval sc_qintval; struct event sc_ev; + struct event sc_rqev; int sc_pipes[PROC_COUNT] [PROC_COUNT][2]; struct imsgbuf *sc_ibufs[PROC_COUNT]; |