diff options
Diffstat (limited to 'usr.sbin/smtpd/queue.c')
-rw-r--r-- | usr.sbin/smtpd/queue.c | 1303 |
1 files changed, 828 insertions, 475 deletions
diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c index aa952c596ee..9021a8df666 100644 --- a/usr.sbin/smtpd/queue.c +++ b/usr.sbin/smtpd/queue.c @@ -1,4 +1,4 @@ -/* $OpenBSD: queue.c,v 1.16 2008/11/25 20:26:40 gilles Exp $ */ +/* $OpenBSD: queue.c,v 1.17 2008/12/03 17:58:00 gilles Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> @@ -54,27 +54,35 @@ void queue_dispatch_lka(int, short, void *); void queue_setup_events(struct smtpd *); void queue_disable_events(struct smtpd *); void queue_timeout(int, short, void *); -int queue_create_message_file(char *); -void queue_delete_message_file(char *); -int queue_record_submission(struct message *); -int queue_remove_submission(struct message *); -struct batch *batch_lookup(struct smtpd *, struct message *); -int batch_schedule(struct batch *, time_t); -void batch_unschedule(struct batch *); -void batch_send(struct smtpd *, struct batch *, time_t); -int queue_update_database(struct message *); +void queue_process_runqueue(int, short, void *); +int queue_create_incoming_layout(char *); +int queue_record_envelope(struct message *); +int queue_remove_envelope(struct message *); int queue_open_message_file(struct batch *); int queue_batch_resolved(struct smtpd *, struct batch *); +int queue_message_schedule(struct message *, time_t); +void queue_delete_message_file(char *); +u_int16_t queue_message_hash(struct message *); +int queue_record_incoming_envelope(struct message *); +int queue_update_incoming_envelope(struct message *); +int queue_remove_incoming_envelope(struct message *); +int queue_commit_incoming_message(struct message *); +void queue_delete_incoming_message(char *); +int queue_update_envelope(struct message *); +int queue_open_incoming_message_file(struct message *); +void queue_process(struct smtpd *); +int queue_process_bucket(struct smtpd *, u_int16_t); +int queue_process_message(struct smtpd *, char *); +void queue_process_envelope(struct smtpd *, char *, char *); +int queue_load_envelope(struct message *, char *); +void queue_delete_message(char *); + +void batch_send(struct smtpd *, struct batch *, time_t); +u_int32_t hash(u_int8_t *, size_t); struct batch *queue_record_batch(struct smtpd *, struct message *); struct batch *batch_by_id(struct smtpd *, u_int64_t); +struct batch *batch_lookup(struct smtpd *, struct message *); struct message *message_by_id(struct smtpd *, struct batch *, u_int64_t); -void debug_display_batch(struct batch *); -void debug_display_message(struct message *); -void queue_load_submissions(struct smtpd *, time_t); -int queue_message_schedule(struct message *, time_t); -int queue_message_from_id(char *, struct message *); -int queue_message_complete(struct message *); -int queue_init_submissions(void); void queue_sig_handler(int sig, short event, void *p) @@ -171,28 +179,32 @@ queue_dispatch_smtp(int sig, short event, void *p) break; switch (imsg.hdr.type) { - case IMSG_QUEUE_CREATE_MESSAGE_FILE: { + case IMSG_QUEUE_CREATE_MESSAGE: { struct message *messagep; struct submit_status ss; - int fd; log_debug("mfa_dispatch_smtp: creating message file"); messagep = imsg.data; ss.id = messagep->session_id; ss.code = 250; - fd = queue_create_message_file(ss.u.msgid); - imsg_compose(ibuf, IMSG_SMTP_MESSAGE_FILE, 0, 0, fd, + bzero(ss.u.msgid, MAXPATHLEN); + + if (! queue_create_incoming_layout(ss.u.msgid)) + ss.code = 421; + + imsg_compose(ibuf, IMSG_SMTP_MESSAGE_ID, 0, 0, -1, &ss, sizeof(ss)); break; } - case IMSG_QUEUE_DELETE_MESSAGE_FILE: { + case IMSG_QUEUE_REMOVE_MESSAGE: { struct message *messagep; messagep = imsg.data; - queue_delete_message_file(messagep->message_id); + if (messagep->message_id[0] != '\0') + queue_delete_incoming_message(messagep->message_id); break; } - case IMSG_QUEUE_MESSAGE_SUBMIT: { + case IMSG_QUEUE_SUBMIT_ENVELOPE: { struct message *messagep; struct submit_status ss; @@ -209,13 +221,17 @@ queue_dispatch_smtp(int sig, short event, void *p) messagep->type = T_MTA_MESSAGE; /* Write to disk */ - queue_record_submission(messagep); + if (! queue_record_incoming_envelope(messagep)) { + ss.code = 421; + imsg_compose(ibuf, IMSG_SMTP_SUBMIT_ACK, 0, 0, -1, + &ss, sizeof(ss)); + break; + } + imsg_compose(ibuf, IMSG_SMTP_SUBMIT_ACK, 0, 0, -1, &ss, sizeof(ss)); if (messagep->type & T_MTA_MESSAGE) { - messagep->flags |= F_MESSAGE_READY; - queue_update_database(messagep); break; } @@ -235,20 +251,37 @@ queue_dispatch_smtp(int sig, short event, void *p) break; } - case IMSG_QUEUE_MESSAGE_COMPLETE: { + case IMSG_QUEUE_COMMIT_MESSAGE: { struct message *messagep; struct submit_status ss; messagep = imsg.data; ss.id = messagep->session_id; - queue_message_complete(messagep); + if (! queue_commit_incoming_message(messagep)) + ss.code = 421; imsg_compose(ibuf, IMSG_SMTP_SUBMIT_ACK, 0, 0, -1, &ss, sizeof(ss)); break; } + case IMSG_QUEUE_MESSAGE_FILE: { + struct message *messagep; + struct submit_status ss; + int fd; + + messagep = imsg.data; + ss.id = messagep->session_id; + + fd = queue_open_incoming_message_file(messagep); + if (fd == -1) + ss.code = 421; + + imsg_compose(ibuf, IMSG_SMTP_MESSAGE_FILE, 0, 0, fd, + &ss, sizeof(ss)); + break; + } default: log_debug("queue_dispatch_smtp: unexpected imsg %d", imsg.hdr.type); @@ -306,7 +339,7 @@ queue_dispatch_mda(int sig, short event, void *p) if (messagep->status & S_MESSAGE_TEMPFAILURE) { messagep->status &= ~S_MESSAGE_TEMPFAILURE; messagep->flags &= ~F_MESSAGE_PROCESSING; - queue_update_database(messagep); + queue_update_envelope(messagep); break; } @@ -319,18 +352,17 @@ queue_dispatch_mda(int sig, short event, void *p) messagep->id = queue_generate_id(); messagep->batch_id = 0; messagep->type |= T_DAEMON_MESSAGE; - messagep->flags |= F_MESSAGE_READY; messagep->lasttry = 0; messagep->retry = 0; - queue_record_submission(messagep); + queue_record_envelope(messagep); *messagep = msave; } - queue_remove_submission(messagep); + queue_remove_envelope(messagep); break; } - /* no error, remove submission */ - queue_remove_submission(messagep); + /* no error, remove envelope */ + queue_remove_envelope(messagep); break; } @@ -402,7 +434,7 @@ queue_dispatch_mta(int sig, short event, void *p) if (messagep->status & S_MESSAGE_TEMPFAILURE) { messagep->status &= ~S_MESSAGE_TEMPFAILURE; messagep->flags &= ~F_MESSAGE_PROCESSING; - queue_update_database(messagep); + queue_update_envelope(messagep); break; } @@ -415,18 +447,17 @@ queue_dispatch_mta(int sig, short event, void *p) messagep->id = queue_generate_id(); messagep->batch_id = 0; messagep->type |= T_DAEMON_MESSAGE; - messagep->flags |= F_MESSAGE_READY; messagep->lasttry = 0; messagep->retry = 0; - queue_record_submission(messagep); + queue_record_envelope(messagep); *messagep = msave; } - queue_remove_submission(messagep); + queue_remove_envelope(messagep); break; } - /* no error, remove submission */ - queue_remove_submission(messagep); + /* no error, remove envelope */ + queue_remove_envelope(messagep); break; } @@ -483,12 +514,7 @@ queue_dispatch_lka(int sig, short event, void *p) messagep = imsg.data; messagep->id = queue_generate_id(); messagep->batch_id = 0; - queue_record_submission(messagep); - - if (messagep->type & T_MTA_MESSAGE) { - messagep->flags |= F_MESSAGE_READY; - queue_update_database(messagep); - } + queue_record_incoming_envelope(messagep); if (messagep->type & T_MDA_MESSAGE) { imsg_compose(ibuf, IMSG_LKA_FORWARD_LOOKUP, 0, 0, -1, @@ -503,8 +529,7 @@ queue_dispatch_lka(int sig, short event, void *p) messagep = (struct message *)imsg.data; messagep->id = queue_generate_id(); messagep->batch_id = 0; - messagep->flags |= F_MESSAGE_READY; - queue_record_submission(messagep); + queue_record_incoming_envelope(messagep); break; } @@ -512,7 +537,7 @@ queue_dispatch_lka(int sig, short event, void *p) struct message *messagep; messagep = (struct message *)imsg.data; - queue_remove_submission(messagep); + queue_remove_incoming_envelope(messagep); break; } @@ -543,9 +568,14 @@ queue_setup_events(struct smtpd *env) struct timeval tv; evtimer_set(&env->sc_ev, queue_timeout, env); - tv.tv_sec = 1; - tv.tv_usec = 0; + tv.tv_sec = 0; + tv.tv_usec = 10; evtimer_add(&env->sc_ev, &tv); + + evtimer_set(&env->sc_rqev, queue_process_runqueue, env); + tv.tv_sec = 0; + tv.tv_usec = 10; + evtimer_add(&env->sc_rqev, &tv); } void @@ -558,12 +588,13 @@ void queue_timeout(int fd, short event, void *p) { struct smtpd *env = p; - struct batch *batchp, *nxt; struct timeval tv; time_t curtime; + struct batch *batchp, *nxt; + + queue_process(env); curtime = time(NULL); - queue_load_submissions(env, curtime); for (batchp = SPLAY_MIN(batchtree, &env->batch_queue); batchp != NULL; @@ -582,95 +613,11 @@ queue_timeout(int fd, short event, void *p) } - tv.tv_sec = 5; - tv.tv_usec = 0; + tv.tv_sec = 0; + tv.tv_usec = 10; evtimer_add(&env->sc_ev, &tv); } -void -queue_load_submissions(struct smtpd *env, time_t tm) -{ - DIR *dirp; - struct dirent *dp; - struct batch *batchp; - struct message *messagep; - struct message message; - - dirp = opendir(PATH_ENVELOPES); - if (dirp == NULL) - err(1, "opendir"); - - while ((dp = readdir(dirp)) != NULL) { - - if (dp->d_name[0] == '.') - continue; - - if (! queue_message_from_id(dp->d_name, &message)) { - warnx("failed to load message \"%s\"", dp->d_name); - continue; - } - - if (! queue_message_schedule(&message, tm)) { - if (message.flags & F_MESSAGE_EXPIRED) { - log_debug("message expired, create mdaemon"); - queue_remove_submission(&message); - } - continue; - } - - message.lasttry = tm; - message.flags |= F_MESSAGE_PROCESSING; - queue_update_database(&message); - - messagep = calloc(1, sizeof (struct message)); - if (messagep == NULL) - err(1, "calloc"); - *messagep = message; - - batchp = batch_lookup(env, messagep); - if (batchp != NULL) - messagep->batch_id = batchp->id; - - batchp = queue_record_batch(env, messagep); - if (messagep->batch_id == 0) - messagep->batch_id = batchp->id; - } - - closedir(dirp); -} - -int -queue_message_from_id(char *message_id, struct message *message) -{ - int spret; - char pathname[MAXPATHLEN]; - FILE *fp; - - spret = snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_ENVELOPES, message_id); - if (spret == -1 || spret >= MAXPATHLEN) { - warnx("queue_load_submissions: filename too long."); - return 0; - } - - fp = fopen(pathname, "r"); - if (fp == NULL) { - warnx("queue_load_submissions: fopen: %s", message_id); - goto bad; - } - - if (fread(message, sizeof(struct message), 1, fp) != 1) { - warnx("queue_load_submissions: fread: %s", message_id); - goto bad; - } - - fclose(fp); - return 1; -bad: - if (fp != NULL) - fclose(fp); - return 0; -} - pid_t queue(struct smtpd *env) { @@ -722,9 +669,6 @@ queue(struct smtpd *env) SPLAY_INIT(&env->batch_queue); - queue_init_submissions(); - queue_load_submissions(env, time(NULL)); - event_init(); signal_set(&ev_sigint, SIGINT, queue_sig_handler, env); @@ -743,145 +687,272 @@ queue(struct smtpd *env) return (0); } -u_int64_t -queue_generate_id(void) +void +queue_process(struct smtpd *env) { - u_int64_t id; - struct timeval tp; + u_int16_t cbucket = 0; + static u_int16_t lbucket = 0; + DIR *dirp; + struct dirent *dp; + const char *errstr; + static u_int8_t bucketdone = 1; - if (gettimeofday(&tp, NULL) == -1) - fatal("queue_generate_id: time"); + if (! bucketdone) { + bucketdone = queue_process_bucket(env, lbucket); + if (bucketdone) + lbucket = (lbucket + 1) % DIRHASH_BUCKETS; + return; + } - id = (u_int32_t)tp.tv_sec; - id <<= 32; - id |= (u_int32_t)tp.tv_usec; - usleep(1); + dirp = opendir(PATH_QUEUE); + if (dirp == NULL) + fatal("queue_process: opendir"); - return (id); + while ((dp = readdir(dirp)) != NULL) { + + if (strcmp(dp->d_name, ".") == 0 || + strcmp(dp->d_name, "..") == 0) + continue; + + cbucket = strtonum(dp->d_name, 0, DIRHASH_BUCKETS - 1, &errstr); + if (errstr) { + log_warn("queue_process: %s/%s is not a valid bucket", + PATH_QUEUE, dp->d_name); + continue; + } + + if (cbucket == lbucket) + break; + } + closedir(dirp); + + if (dp == NULL) { + lbucket = (lbucket + 1) % DIRHASH_BUCKETS; + return; + } + + bucketdone = queue_process_bucket(env, cbucket); + if (bucketdone) + lbucket = (lbucket + 1) % DIRHASH_BUCKETS; } int -queue_create_message_file(char *message_id) +queue_process_bucket(struct smtpd *env, u_int16_t bucket) { - int fd; - char pathname[MAXPATHLEN]; int spret; + static DIR *dirp = NULL; + struct dirent *dp; + static char *msgid = NULL; + char bucketpath[MAXPATHLEN]; + static u_int8_t messagedone = 1; - spret = snprintf(pathname, MAXPATHLEN, "%s/%d.XXXXXXXXXXXXXXXX", - PATH_MESSAGES, time(NULL)); + if (! messagedone) { + messagedone = queue_process_message(env, msgid); + if (! messagedone) + return 0; + msgid = NULL; + } + + spret = snprintf(bucketpath, MAXPATHLEN, "%s/%d", PATH_QUEUE, bucket); if (spret == -1 || spret >= MAXPATHLEN) - return -1; + fatal("queue_process_bucket: snprintf"); + + if (dirp == NULL) { + dirp = opendir(bucketpath); + if (dirp == NULL) + fatal("queue_process_bucket: opendir"); + } - fd = mkstemp(pathname); - if (fd == -1) - fatal("queue_create_message_file: mkstemp"); + while ((dp = readdir(dirp)) != NULL) { - /* XXX - this won't fail if message_id is MAXPATHLEN bytes */ - if (strlcpy(message_id, pathname + sizeof(PATH_MESSAGES), MAXPATHLEN) - >= MAXPATHLEN) - fatal("queue_create_message_file: message id too long"); + if (strcmp(dp->d_name, ".") == 0 || + strcmp(dp->d_name, "..") == 0) + continue; - return fd; + break; + } + + if (dp != NULL) { + msgid = dp->d_name; + messagedone = queue_process_message(env, msgid); + if (! messagedone) + return 0; + msgid = NULL; + } + + closedir(dirp); + dirp = NULL; + return 1; } -void -queue_delete_message_file(char *message_id) +int +queue_process_message(struct smtpd *env, char *messageid) { - char pathname[MAXPATHLEN]; int spret; + static DIR *dirp = NULL; + struct dirent *dp; + char evppath[MAXPATHLEN]; + u_int16_t hval = 0; - spret = snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_MESSAGES, message_id); + hval = hash(messageid, strlen(messageid)) % DIRHASH_BUCKETS; + + spret = snprintf(evppath, MAXPATHLEN, "%s/%d/%s%s", PATH_QUEUE, hval, + messageid, PATH_ENVELOPES); if (spret == -1 || spret >= MAXPATHLEN) - fatal("queue_delete_message_file: message id too long"); + fatal("queue_process_message: snprintf"); - if (unlink(pathname) == -1) - fatal("queue_delete_message_file: unlink"); + if (dirp == NULL) { + dirp = opendir(evppath); + if (dirp == NULL) + fatal("queue_process_message: opendir"); + } + + while ((dp = readdir(dirp)) != NULL) { + + if (strcmp(dp->d_name, ".") == 0 || + strcmp(dp->d_name, "..") == 0) + continue; + break; + } + + if (dp != NULL) { + queue_process_envelope(env, messageid, dp->d_name); + return 0; + } + + closedir(dirp); + dirp = NULL; + return 1; } -int -queue_record_submission(struct message *message) +void +queue_process_envelope(struct smtpd *env, char *msgid, char *evpid) { - char pathname[MAXPATHLEN]; - char linkname[MAXPATHLEN]; - char dbname[MAXPATHLEN]; - char message_uid[MAXPATHLEN]; - char *spool; - size_t spoolsz; - int fd; - int mode = O_CREAT|O_TRUNC|O_WRONLY|O_EXCL|O_SYNC; int spret; - FILE *fp; + struct message message; + time_t tm; + char evppath[MAXPATHLEN]; + char rqpath[MAXPATHLEN]; + u_int16_t hval; + struct stat sb; - if (message->type & T_DAEMON_MESSAGE) { - spool = PATH_DAEMON; + if (! queue_load_envelope(&message, evpid)) { + log_debug("failed to load envelope: %s", evpid); + return; } - else { - switch (message->recipient.rule.r_action) { - case A_MBOX: - case A_MAILDIR: - case A_EXT: - spool = PATH_LOCAL; - break; - default: - spool = PATH_RELAY; + + tm = time(NULL); + + if (! queue_message_schedule(&message, tm)) { + if (message.flags & F_MESSAGE_EXPIRED) { + log_debug("message has expired, mdaemon"); + queue_remove_envelope(&message); } + return; } - spoolsz = strlen(spool); - spret = snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_MESSAGES, - message->message_id); + message.flags |= F_MESSAGE_SCHEDULED; + queue_update_envelope(&message); + + log_debug("SCHEDULED: %s", evpid); + hval = hash(msgid, strlen(msgid)) % DIRHASH_BUCKETS; + spret = snprintf(evppath, MAXPATHLEN, "%s/%d/%s%s/%s", PATH_QUEUE, hval, + msgid, PATH_ENVELOPES, evpid); if (spret == -1 || spret >= MAXPATHLEN) - fatal("queue_record_submission: message id too long"); + fatal("queue_process_envelope: snprintf"); - for (;;) { - spret = snprintf(linkname, MAXPATHLEN, "%s/%s.%qu", spool, - message->message_id, (u_int64_t)arc4random()); - if (spret == -1 || spret >= MAXPATHLEN) - fatal("queue_record_submission: message uid too long"); + spret = snprintf(rqpath, MAXPATHLEN, "%s/%s", PATH_RUNQUEUE, evpid); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_process_envelope: snprintf"); - (void)strlcpy(message_uid, linkname + spoolsz + 1, MAXPATHLEN); + if (stat(rqpath, &sb) == -1) { + if (errno != ENOENT) + fatal("queue_process_envelope: stat"); - if (link(pathname, linkname) == -1) { - if (errno == EEXIST) - continue; - err(1, "link: %s , %s", pathname, linkname); + if (symlink(evppath, rqpath) == -1) { + log_info("queue_process_envelope: " + "failed to place envelope in runqueue"); } + } +} - spret = snprintf(dbname, MAXPATHLEN, "%s/%s", PATH_ENVELOPES, - message_uid); - if (spret == -1 || spret >= MAXPATHLEN) - fatal("queue_record_submission: database uid too long"); - - fd = open(dbname, mode, 0600); - if (fd == -1) - if (unlink(linkname) == -1) - fatal("queue_record_submission: unlink"); +void +queue_process_runqueue(int fd, short event, void *p) +{ + DIR *dirp; + struct dirent *dp; + struct message message; + struct message *messagep; + struct batch *batchp; + char pathname[MAXPATHLEN]; + time_t tm; + struct smtpd *env = p; + struct timeval tv; - if (flock(fd, LOCK_EX) == -1) - fatal("queue_record_submission: flock"); + tm = time(NULL); - fp = fdopen(fd, "w"); - if (fp == NULL) - fatal("fdopen"); + dirp = opendir(PATH_RUNQUEUE); + if (dirp == NULL) + fatal("queue_process_runqueue: opendir"); - if (strlcpy(message->message_uid, message_uid, MAXPATHLEN) - >= MAXPATHLEN) - fatal("queue_record_submission: message uid too long"); + while ((dp = readdir(dirp)) != NULL) { + if (strcmp(dp->d_name, ".") == 0 || + strcmp(dp->d_name, "..") == 0) + continue; - message->creation = time(NULL); + /* XXX */ + snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_RUNQUEUE, dp->d_name); + unlink(pathname); - if (fwrite(message, sizeof(struct message), 1, fp) != 1) { - fclose(fp); - unlink(dbname); - return 0; + if (! queue_load_envelope(&message, dp->d_name)) { + log_debug("failed to load envelope"); + continue; } - fflush(fp); - fsync(fd); - fclose(fp); - break; + if (message.flags & F_MESSAGE_PROCESSING) + continue; + + message.lasttry = tm; + message.flags &= ~F_MESSAGE_SCHEDULED; + message.flags |= F_MESSAGE_PROCESSING; + queue_update_envelope(&message); + + messagep = calloc(1, sizeof (struct message)); + if (messagep == NULL) + err(1, "calloc"); + *messagep = message; + + batchp = batch_lookup(env, messagep); + if (batchp != NULL) + messagep->batch_id = batchp->id; + + batchp = queue_record_batch(env, messagep); + if (messagep->batch_id == 0) + messagep->batch_id = batchp->id; } - return 1; + + closedir(dirp); + + tv.tv_sec = 0; + tv.tv_usec = 10; + evtimer_add(&env->sc_rqev, &tv); +} + +u_int64_t +queue_generate_id(void) +{ + u_int64_t id; + struct timeval tp; + + if (gettimeofday(&tp, NULL) == -1) + fatal("queue_generate_id: time"); + + id = (u_int32_t)tp.tv_sec; + id <<= 32; + id |= (u_int32_t)tp.tv_usec; + usleep(1); + + return (id); } struct batch * @@ -941,71 +1012,6 @@ queue_record_batch(struct smtpd *env, struct message *messagep) } int -queue_remove_submission(struct message *message) -{ - char pathname[MAXPATHLEN]; - char linkname[MAXPATHLEN]; - char dbname[MAXPATHLEN]; - char *spool; - struct stat sb; - int spret; - - if (message->type & T_DAEMON_MESSAGE) { - spool = PATH_DAEMON; - } - else { - switch (message->recipient.rule.r_action) { - case A_MBOX: - case A_MAILDIR: - case A_EXT: - spool = PATH_LOCAL; - break; - default: - spool = PATH_RELAY; - } - } - - spret = snprintf(dbname, MAXPATHLEN, "%s/%s", PATH_ENVELOPES, - message->message_uid); - if (spret == -1 || spret >= MAXPATHLEN) - fatal("queue_remove_submission: database uid too long"); - - spret = snprintf(linkname, MAXPATHLEN, "%s/%s", spool, - message->message_uid); - if (spret == -1 || spret >= MAXPATHLEN) - fatal("queue_remove_submission: message uid too long"); - - spret = snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_MESSAGES, - message->message_id); - if (spret == -1 || spret >= MAXPATHLEN) - fatal("queue_remove_submission: message id too long"); - - if (unlink(dbname) == -1) { - warnx("dbname: %s", dbname); - fatal("queue_remove_submission: unlink"); - } - - if (unlink(linkname) == -1) { - warnx("linkname: %s", linkname); - fatal("queue_remove_submission: unlink"); - } - - if (stat(pathname, &sb) == -1) { - warnx("pathname: %s", pathname); - fatal("queue_remove_submission: stat"); - } - - if (sb.st_nlink == 1) { - if (unlink(pathname) == -1) { - warnx("pathname: %s", pathname); - fatal("queue_remove_submission: unlink"); - } - } - - return 1; -} - -int queue_remove_batch_message(struct smtpd *env, struct batch *batchp, struct message *messagep) { TAILQ_REMOVE(&batchp->messages, messagep, entry); @@ -1082,75 +1088,6 @@ queue_batch_resolved(struct smtpd *env, struct batch *lookup) return 1; } -int -queue_open_message_file(struct batch *batch) -{ - int fd; - char pathname[MAXPATHLEN]; - int spret; - - spret = snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_MESSAGES, - batch->message_id); - if (spret == -1 || spret >= MAXPATHLEN) - fatal("queue_open_message_file: message id too long"); - - fd = open(pathname, O_RDONLY); - if (fd == -1) - fatal("queue_open_message_file: open"); - - return fd; -} - -int -queue_update_database(struct message *message) -{ - int fd; - char *spool; - char pathname[MAXPATHLEN]; - int spret; - FILE *fp; - mode_t mode = O_RDWR; - - if (message->type & T_DAEMON_MESSAGE) { - spool = PATH_DAEMON; - } - else { - switch (message->recipient.rule.r_action) { - case A_MBOX: - case A_MAILDIR: - case A_EXT: - spool = PATH_LOCAL; - break; - default: - spool = PATH_RELAY; - } - } - - spret = snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_ENVELOPES, - message->message_uid); - if (spret == -1 || spret >= MAXPATHLEN) - fatal("queue_update_database: pathname too long"); - - if ((fd = open(pathname, mode)) == -1) - fatal("queue_update_database: cannot open database"); - - - if (flock(fd, LOCK_EX) == -1) - fatal("queue_update_database: flock"); - - fp = fdopen(fd, "w"); - if (fp == NULL) - fatal("fdopen"); - - if (fwrite(message, sizeof(struct message), 1, fp) != 1) - fatal("queue_update_database: cannot write database"); - fflush(fp); - fsync(fd); - fclose(fp); - - return 1; -} - struct batch * batch_lookup(struct smtpd *env, struct message *message) { @@ -1201,98 +1138,6 @@ batch_cmp(struct batch *s1, struct batch *s2) } int -queue_init_submissions(void) -{ - DIR *dirp; - struct dirent *dp; - struct message message; - char pathname[MAXPATHLEN]; - FILE *fp; - int spret; - - dirp = opendir(PATH_ENVELOPES); - if (dirp == NULL) - err(1, "opendir"); - - while ((dp = readdir(dirp)) != NULL) { - - if (dp->d_name[0] == '.') - continue; - - spret = snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_ENVELOPES, - dp->d_name); - if (spret == -1 || spret >= MAXPATHLEN) - continue; - - fp = fopen(pathname, "r"); - if (fp == NULL) - continue; - - if (fread(&message, sizeof(struct message), 1, fp) != 1) { - fclose(fp); - continue; - } - fclose(fp); - - if ((message.flags & F_MESSAGE_COMPLETE) == 0) - unlink(pathname); - else { - message.flags &= ~F_MESSAGE_PROCESSING; - queue_update_database(&message); - } - } - - closedir(dirp); - return 1; -} - -int -queue_message_complete(struct message *messagep) -{ - DIR *dirp; - struct dirent *dp; - struct message message; - char pathname[MAXPATHLEN]; - FILE *fp; - int spret; - - dirp = opendir(PATH_ENVELOPES); - if (dirp == NULL) - err(1, "opendir"); - - while ((dp = readdir(dirp)) != NULL) { - - if (dp->d_name[0] == '.') - continue; - - if (strncmp(messagep->message_id, - dp->d_name, strlen(messagep->message_id)) != 0) - continue; - - spret = snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_ENVELOPES, - dp->d_name); - if (spret == -1 || spret >= MAXPATHLEN) - continue; - - fp = fopen(pathname, "r"); - if (fp == NULL) - continue; - - if (fread(&message, sizeof(struct message), 1, fp) != 1) { - fclose(fp); - continue; - } - fclose(fp); - - message.flags |= F_MESSAGE_COMPLETE; - queue_update_database(&message); - } - - closedir(dirp); - return 1; -} - -int queue_message_schedule(struct message *messagep, time_t tm) { time_t delay; @@ -1307,11 +1152,8 @@ queue_message_schedule(struct message *messagep, time_t tm) messagep->flags |= F_MESSAGE_EXPIRED; return 0; } - - if ((messagep->flags & F_MESSAGE_READY) == 0) - return 0; - - if ((messagep->flags & F_MESSAGE_COMPLETE) == 0) + + if ((messagep->flags & F_MESSAGE_SCHEDULED) != 0) return 0; if ((messagep->flags & F_MESSAGE_PROCESSING) != 0) @@ -1347,12 +1189,6 @@ queue_message_schedule(struct message *messagep, time_t tm) } void -batch_unschedule(struct batch *batchp) -{ - batchp->flags &= ~(F_BATCH_SCHEDULED); -} - -void batch_send(struct smtpd *env, struct batch *batchp, time_t curtime) { u_int8_t proctype; @@ -1390,6 +1226,7 @@ batch_by_id(struct smtpd *env, u_int64_t id) return SPLAY_FIND(batchtree, &env->batch_queue, &lookup); } + struct message * message_by_id(struct smtpd *env, struct batch *batchp, u_int64_t id) { @@ -1412,4 +1249,520 @@ message_by_id(struct smtpd *env, struct batch *batchp, u_int64_t id) return NULL; } +int +queue_create_incoming_layout(char *message_id) +{ + int spret; + char rootdir[MAXPATHLEN]; + char evpdir[MAXPATHLEN]; + + spret = snprintf(rootdir, MAXPATHLEN, "%s/%d.XXXXXXXXXXXXXXXX", + PATH_INCOMING, time(NULL)); + if (spret == -1 || spret >= MAXPATHLEN) + return -1; + + if (mkdtemp(rootdir) == NULL) + return -1; + + if (strlcpy(message_id, rootdir + strlen(PATH_INCOMING) + 1, MAXPATHLEN) + >= MAXPATHLEN) + goto badroot; + + spret = snprintf(evpdir, MAXPATHLEN, "%s%s", + rootdir, PATH_ENVELOPES); + if (spret == -1 || spret >= MAXPATHLEN) + goto badroot; + + if (mkdir(evpdir, 0700) == -1) + goto badroot; + + return 1; + +badroot: + if (rmdir(rootdir) == -1) + fatal("queue_create_incoming_layout: rmdir"); + + return 0; +} + +void +queue_delete_incoming_message(char *message_id) +{ + int spret; + char rootdir[MAXPATHLEN]; + char evpdir[MAXPATHLEN]; + char evppath[MAXPATHLEN]; + char msgpath[MAXPATHLEN]; + DIR *dirp; + struct dirent *dp; + + spret = snprintf(rootdir, MAXPATHLEN, "%s/%s", PATH_INCOMING, + message_id); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_delete_incoming_message: snprintf"); + + spret = snprintf(evpdir, MAXPATHLEN, "%s%s", + rootdir, PATH_ENVELOPES); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_delete_incoming_message: snprintf"); + + spret = snprintf(msgpath, MAXPATHLEN, "%s/message", rootdir); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_delete_incoming_message: snprintf"); + + if (unlink(msgpath) == -1) { + if (errno != ENOENT) + fatal("queue_delete_incoming_message: unlink"); + } + + log_debug("evpdir: %s", evpdir); + dirp = opendir(evpdir); + if (dirp == NULL) + fatal("queue_delete_incoming_message: opendir"); + while ((dp = readdir(dirp)) != NULL) { + if (strcmp(dp->d_name, ".") == 0 || + strcmp(dp->d_name, "..") == 0) + continue; + spret = snprintf(evppath, MAXPATHLEN, "%s/%s", evpdir, dp->d_name); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_create_incoming_message: snprintf"); + + if (unlink(evppath) == -1) { + if (errno != ENOENT) + fatal("queue_create_incoming_message: unlink"); + } + } + closedir(dirp); + + if (rmdir(evpdir) == -1) + if (errno != ENOENT) + fatal("queue_create_incoming_message: rmdir"); + + if (rmdir(rootdir) == -1) + if (errno != ENOENT) + fatal("queue_create_incoming_message: rmdir"); + + return; +} + +int +queue_record_incoming_envelope(struct message *message) +{ + char rootdir[MAXPATHLEN]; + char evpdir[MAXPATHLEN]; + char evpname[MAXPATHLEN]; + char message_uid[MAXPATHLEN]; + int fd; + int mode = O_CREAT|O_TRUNC|O_WRONLY|O_EXCL|O_SYNC; + int spret; + FILE *fp; + int ret; + + spret = snprintf(rootdir, MAXPATHLEN, "%s/%s", PATH_INCOMING, + message->message_id); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_record_incoming_envelope: snprintf"); + + spret = snprintf(evpdir, MAXPATHLEN, "%s%s", rootdir, PATH_ENVELOPES); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_record_incoming_envelope: snprintf"); + + for (;;) { + spret = snprintf(evpname, MAXPATHLEN, "%s/%s.%qu", evpdir, + message->message_id, (u_int64_t)arc4random()); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_record_incoming_envelope: snprintf"); + + (void)strlcpy(message_uid, evpname + strlen(evpdir) + 1, MAXPATHLEN); + + fd = open(evpname, mode, 0600); + if (fd == -1) { + if (errno == EEXIST) + continue; + return 0; + } + + if (flock(fd, LOCK_EX) == -1) + fatal("queue_record_submission: flock"); + + fp = fdopen(fd, "w"); + if (fp == NULL) + fatal("fdopen"); + + if (strlcpy(message->message_uid, message_uid, MAXPATHLEN) + >= MAXPATHLEN) + fatal("queue_record_submission: strlcpy"); + + message->creation = time(NULL); + + if ((ret = fwrite(message, sizeof (struct message), 1, fp)) != 1) { + fclose(fp); + unlink(evpname); + return 0; + } + fflush(fp); + fsync(fd); + fclose(fp); + + break; + } + return 1; +} + +int +queue_update_incoming_envelope(struct message *messagep) +{ + int fd; + char pathname[MAXPATHLEN]; + int spret; + FILE *fp; + mode_t mode = O_RDWR; + + spret = snprintf(pathname, MAXPATHLEN, "%s/%s%s/%s", PATH_INCOMING, + messagep->message_id, PATH_ENVELOPES, messagep->message_uid); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_update_incoming_envelope: snprintf"); + + if ((fd = open(pathname, mode)) == -1) + fatal("queue_update_incoming_envelope: open"); + + if (flock(fd, LOCK_EX) == -1) + fatal("queue_update_incoming_envelope: flock"); + + fp = fdopen(fd, "w"); + if (fp == NULL) + fatal("queue_update_incoming_envelope: fdopen"); + + if (fwrite(messagep, sizeof(struct message), 1, fp) != 1) + fatal("queue_update_incoming_envelope: fwrite"); + fflush(fp); + fsync(fd); + fclose(fp); + + return 1; +} + +int +queue_remove_incoming_envelope(struct message *message) +{ + char pathname[MAXPATHLEN]; + int spret; + + spret = snprintf(pathname, MAXPATHLEN, "%s/%s%s/%s", PATH_INCOMING, + message->message_id, PATH_ENVELOPES, message->message_uid); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_remove_incoming_envelope: snprintf"); + + if (unlink(pathname) == -1) + if (errno != ENOENT) + fatal("queue_remove_incoming_envelope: unlink"); + + return 1; +} + +int +queue_commit_incoming_message(struct message *messagep) +{ + int spret; + char rootdir[MAXPATHLEN]; + char queuedir[MAXPATHLEN]; + u_int16_t hval; + + spret = snprintf(rootdir, MAXPATHLEN, "%s/%s", PATH_INCOMING, + messagep->message_id); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_commit_message_incoming: snprintf"); + + hval = queue_message_hash(messagep); + + spret = snprintf(queuedir, MAXPATHLEN, "%s/%d", PATH_QUEUE, hval); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_commit_message_incoming: snprintf"); + + if (mkdir(queuedir, 0700) == -1) { + if (errno == ENOSPC) + return 0; + if (errno != EEXIST) + fatal("queue_commit_message_incoming: mkdir"); + } + + spret = snprintf(queuedir, MAXPATHLEN, "%s/%d/%s", PATH_QUEUE, hval, + messagep->message_id); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_commit_message_incoming: snprintf"); + + + if (rename(rootdir, queuedir) == -1) + fatal("queue_commit_message_incoming: rename"); + + return 1; +} + +int +queue_open_incoming_message_file(struct message *messagep) +{ + char pathname[MAXPATHLEN]; + int spret; + mode_t mode = O_CREAT|O_EXCL|O_RDWR; + + spret = snprintf(pathname, MAXPATHLEN, "%s/%s/message", PATH_INCOMING, + messagep->message_id); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_open_incoming_message_file: snprintf"); + + return open(pathname, mode, 0600); +} + +int +queue_record_envelope(struct message *messagep) +{ + char queuedir[MAXPATHLEN]; + char evpdir[MAXPATHLEN]; + char evpname[MAXPATHLEN]; + char message_uid[MAXPATHLEN]; + int fd; + int mode = O_CREAT|O_TRUNC|O_WRONLY|O_EXCL|O_SYNC; + int spret; + FILE *fp; + int ret; + u_int16_t hval; + + spret = snprintf(queuedir, MAXPATHLEN, "%s/%s", PATH_QUEUE, + messagep->message_id); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_record_envelope: snprintf"); + + hval = queue_message_hash(messagep); + + spret = snprintf(queuedir, MAXPATHLEN, "%s/%d", PATH_QUEUE, hval); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_record_envelope: snprintf"); + + spret = snprintf(evpdir, MAXPATHLEN, "%s/%s%s", queuedir, messagep->message_id, + PATH_ENVELOPES); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_record_envelope: snprintf"); + + for (;;) { + spret = snprintf(evpname, MAXPATHLEN, "%s/%s.%qu", evpdir, + messagep->message_id, (u_int64_t)arc4random()); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_record_envelope: snprintf"); + + (void)strlcpy(message_uid, evpname + strlen(evpdir) + 1, MAXPATHLEN); + + fd = open(evpname, mode, 0600); + if (fd == -1) { + if (errno == EEXIST) + continue; + log_debug("failed to open %s", evpname); + fatal("queue_record_envelope: open"); + } + + if (flock(fd, LOCK_EX) == -1) + fatal("queue_record_envelope: flock"); + + fp = fdopen(fd, "w"); + if (fp == NULL) + fatal("fdopen"); + + if (strlcpy(messagep->message_uid, message_uid, MAXPATHLEN) + >= MAXPATHLEN) + fatal("queue_record_envelope: strlcpy"); + + messagep->creation = time(NULL); + + if ((ret = fwrite(messagep, sizeof (struct message), 1, fp)) != 1) { + fclose(fp); + unlink(evpname); + return 0; + } + fflush(fp); + fsync(fd); + fclose(fp); + + break; + } + return 1; + +} + +int +queue_remove_envelope(struct message *messagep) +{ + char pathname[MAXPATHLEN]; + u_int16_t hval; + int spret; + + hval = queue_message_hash(messagep); + + spret = snprintf(pathname, MAXPATHLEN, "%s/%d/%s%s/%s", PATH_QUEUE, + hval, messagep->message_id, PATH_ENVELOPES, messagep->message_uid); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_remove_incoming_envelope: snprintf"); + + if (unlink(pathname) == -1) + fatal("queue_remove_incoming_envelope: unlink"); + + spret = snprintf(pathname, MAXPATHLEN, "%s/%d/%s%s", PATH_QUEUE, + hval, messagep->message_id, PATH_ENVELOPES); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_remove_incoming_envelope: snprintf"); + + if (rmdir(pathname) != -1) + queue_delete_message(messagep->message_id); + + return 1; +} + +int +queue_update_envelope(struct message *messagep) +{ + int fd; + char pathname[MAXPATHLEN]; + int spret; + FILE *fp; + mode_t mode = O_RDWR; + u_int16_t hval; + + hval = queue_message_hash(messagep); + + spret = snprintf(pathname, MAXPATHLEN, "%s/%d/%s%s/%s", PATH_QUEUE, + hval, messagep->message_id, PATH_ENVELOPES, messagep->message_uid); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_update_envelope: snprintf"); + + if ((fd = open(pathname, mode)) == -1) + fatal("queue_update_envelope: open"); + + if (flock(fd, LOCK_EX) == -1) + fatal("queue_update_envelope: flock"); + + fp = fdopen(fd, "w"); + if (fp == NULL) + fatal("queue_update_envelope: fdopen"); + + if (fwrite(messagep, sizeof(struct message), 1, fp) != 1) + fatal("queue_update_envelope: fwrite"); + fflush(fp); + fsync(fd); + fclose(fp); + + return 1; +} + +int +queue_load_envelope(struct message *messagep, char *evpid) +{ + int spret; + char pathname[MAXPATHLEN]; + u_int16_t hval; + FILE *fp; + char msgid[MAXPATHLEN]; + + strlcpy(msgid, evpid, MAXPATHLEN); + *strrchr(msgid, '.') = '\0'; + + hval = hash(msgid, strlen(msgid)) % DIRHASH_BUCKETS; + spret = snprintf(pathname, MAXPATHLEN, "%s/%d/%s%s/%s", PATH_QUEUE, + hval, msgid, PATH_ENVELOPES, evpid); + + fp = fopen(pathname, "r"); + if (fp == NULL) + return 0; + + if (fread(messagep, sizeof(struct message), 1, fp) != 1) + fatal("queue_load_envelope: fread"); + + fclose(fp); + + return 1; +} + +int +queue_open_message_file(struct batch *batchp) +{ + int fd; + char pathname[MAXPATHLEN]; + int spret; + mode_t mode = O_RDONLY; + u_int16_t hval; + + hval = hash(batchp->message_id, strlen(batchp->message_id)) % DIRHASH_BUCKETS; + + spret = snprintf(pathname, MAXPATHLEN, "%s/%d/%s/message", PATH_QUEUE, + hval, batchp->message_id); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_open_message_file: snprintf"); + + if ((fd = open(pathname, mode)) == -1) + fatal("queue_open_message_file: open"); + + return fd; +} + +void +queue_delete_message(char *msgid) +{ + int spret; + char rootdir[MAXPATHLEN]; + char evpdir[MAXPATHLEN]; + char msgpath[MAXPATHLEN]; + u_int16_t hval; + + hval = hash(msgid, strlen(msgid)) % DIRHASH_BUCKETS; + spret = snprintf(rootdir, MAXPATHLEN, "%s/%d/%s", PATH_QUEUE, + hval, msgid); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_delete_message: snprintf"); + + spret = snprintf(evpdir, MAXPATHLEN, "%s%s", + rootdir, PATH_ENVELOPES); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_delete_message: snprintf"); + + spret = snprintf(msgpath, MAXPATHLEN, "%s/message", rootdir); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_delete_message: snprintf"); + + if (unlink(msgpath) == -1) + if (errno != ENOENT) + fatal("queue_delete_message: unlink"); + + if (rmdir(evpdir) == -1) + if (errno != ENOENT) + fatal("queue_delete_message: rmdir"); + + if (rmdir(rootdir) == -1) + if (errno != ENOENT) + fatal("queue_delete_message: rmdir"); + + spret = snprintf(rootdir, MAXPATHLEN, "%s/%d", PATH_QUEUE, + hval); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_delete_message: snprintf"); + + rmdir(rootdir); + + return; +} + +u_int16_t +queue_message_hash(struct message *messagep) +{ + return hash(messagep->message_id, strlen(messagep->message_id)) + % DIRHASH_BUCKETS; +} + +u_int32_t +hash(u_int8_t *buf, size_t len) +{ + u_int32_t h; + + for (h = 5381; len; len--) + h = ((h << 5) + h) + *buf++; + + return h; +} + SPLAY_GENERATE(batchtree, batch, b_nodes, batch_cmp); |