summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGilles Chehade <gilles@cvs.openbsd.org>2008-12-03 17:58:01 +0000
committerGilles Chehade <gilles@cvs.openbsd.org>2008-12-03 17:58:01 +0000
commitadad4802d329834a5277b8ae5f11b1b89c3e20f6 (patch)
treef4200b2f16292f177afebcf74f9e69d829e2c6db
parent409ada53e614d1a9e6c07f9faf58c630baca98dc (diff)
- fix event masking issues in smtp process which could lead to a fatal() if
queue process did not answer fast enough to an imsg. spotted by Jacek Masiulaniec <jacekm@dobremiasto.net> - queue layout was mostly to bootstrap the project, it does not behave good under load, it does complex things to stay in a recoverable state and it probably didnt do it too well. New queue code is simpler, smaller and allows for atomic submissions (a mail can never be in a state where it needs to be recovered). It still needs some work but works better than previous code, no regression.
-rw-r--r--usr.sbin/smtpd/queue.c1303
-rw-r--r--usr.sbin/smtpd/smtp.c24
-rw-r--r--usr.sbin/smtpd/smtp_session.c95
-rw-r--r--usr.sbin/smtpd/smtpd.c17
-rw-r--r--usr.sbin/smtpd/smtpd.h34
5 files changed, 934 insertions, 539 deletions
diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c
index aa952c596ee..9021a8df666 100644
--- a/usr.sbin/smtpd/queue.c
+++ b/usr.sbin/smtpd/queue.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: queue.c,v 1.16 2008/11/25 20:26:40 gilles Exp $ */
+/* $OpenBSD: queue.c,v 1.17 2008/12/03 17:58:00 gilles Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -54,27 +54,35 @@ void queue_dispatch_lka(int, short, void *);
void queue_setup_events(struct smtpd *);
void queue_disable_events(struct smtpd *);
void queue_timeout(int, short, void *);
-int queue_create_message_file(char *);
-void queue_delete_message_file(char *);
-int queue_record_submission(struct message *);
-int queue_remove_submission(struct message *);
-struct batch *batch_lookup(struct smtpd *, struct message *);
-int batch_schedule(struct batch *, time_t);
-void batch_unschedule(struct batch *);
-void batch_send(struct smtpd *, struct batch *, time_t);
-int queue_update_database(struct message *);
+void queue_process_runqueue(int, short, void *);
+int queue_create_incoming_layout(char *);
+int queue_record_envelope(struct message *);
+int queue_remove_envelope(struct message *);
int queue_open_message_file(struct batch *);
int queue_batch_resolved(struct smtpd *, struct batch *);
+int queue_message_schedule(struct message *, time_t);
+void queue_delete_message_file(char *);
+u_int16_t queue_message_hash(struct message *);
+int queue_record_incoming_envelope(struct message *);
+int queue_update_incoming_envelope(struct message *);
+int queue_remove_incoming_envelope(struct message *);
+int queue_commit_incoming_message(struct message *);
+void queue_delete_incoming_message(char *);
+int queue_update_envelope(struct message *);
+int queue_open_incoming_message_file(struct message *);
+void queue_process(struct smtpd *);
+int queue_process_bucket(struct smtpd *, u_int16_t);
+int queue_process_message(struct smtpd *, char *);
+void queue_process_envelope(struct smtpd *, char *, char *);
+int queue_load_envelope(struct message *, char *);
+void queue_delete_message(char *);
+
+void batch_send(struct smtpd *, struct batch *, time_t);
+u_int32_t hash(u_int8_t *, size_t);
struct batch *queue_record_batch(struct smtpd *, struct message *);
struct batch *batch_by_id(struct smtpd *, u_int64_t);
+struct batch *batch_lookup(struct smtpd *, struct message *);
struct message *message_by_id(struct smtpd *, struct batch *, u_int64_t);
-void debug_display_batch(struct batch *);
-void debug_display_message(struct message *);
-void queue_load_submissions(struct smtpd *, time_t);
-int queue_message_schedule(struct message *, time_t);
-int queue_message_from_id(char *, struct message *);
-int queue_message_complete(struct message *);
-int queue_init_submissions(void);
void
queue_sig_handler(int sig, short event, void *p)
@@ -171,28 +179,32 @@ queue_dispatch_smtp(int sig, short event, void *p)
break;
switch (imsg.hdr.type) {
- case IMSG_QUEUE_CREATE_MESSAGE_FILE: {
+ case IMSG_QUEUE_CREATE_MESSAGE: {
struct message *messagep;
struct submit_status ss;
- int fd;
log_debug("mfa_dispatch_smtp: creating message file");
messagep = imsg.data;
ss.id = messagep->session_id;
ss.code = 250;
- fd = queue_create_message_file(ss.u.msgid);
- imsg_compose(ibuf, IMSG_SMTP_MESSAGE_FILE, 0, 0, fd,
+ bzero(ss.u.msgid, MAXPATHLEN);
+
+ if (! queue_create_incoming_layout(ss.u.msgid))
+ ss.code = 421;
+
+ imsg_compose(ibuf, IMSG_SMTP_MESSAGE_ID, 0, 0, -1,
&ss, sizeof(ss));
break;
}
- case IMSG_QUEUE_DELETE_MESSAGE_FILE: {
+ case IMSG_QUEUE_REMOVE_MESSAGE: {
struct message *messagep;
messagep = imsg.data;
- queue_delete_message_file(messagep->message_id);
+ if (messagep->message_id[0] != '\0')
+ queue_delete_incoming_message(messagep->message_id);
break;
}
- case IMSG_QUEUE_MESSAGE_SUBMIT: {
+ case IMSG_QUEUE_SUBMIT_ENVELOPE: {
struct message *messagep;
struct submit_status ss;
@@ -209,13 +221,17 @@ queue_dispatch_smtp(int sig, short event, void *p)
messagep->type = T_MTA_MESSAGE;
/* Write to disk */
- queue_record_submission(messagep);
+ if (! queue_record_incoming_envelope(messagep)) {
+ ss.code = 421;
+ imsg_compose(ibuf, IMSG_SMTP_SUBMIT_ACK, 0, 0, -1,
+ &ss, sizeof(ss));
+ break;
+ }
+
imsg_compose(ibuf, IMSG_SMTP_SUBMIT_ACK, 0, 0, -1,
&ss, sizeof(ss));
if (messagep->type & T_MTA_MESSAGE) {
- messagep->flags |= F_MESSAGE_READY;
- queue_update_database(messagep);
break;
}
@@ -235,20 +251,37 @@ queue_dispatch_smtp(int sig, short event, void *p)
break;
}
- case IMSG_QUEUE_MESSAGE_COMPLETE: {
+ case IMSG_QUEUE_COMMIT_MESSAGE: {
struct message *messagep;
struct submit_status ss;
messagep = imsg.data;
ss.id = messagep->session_id;
- queue_message_complete(messagep);
+ if (! queue_commit_incoming_message(messagep))
+ ss.code = 421;
imsg_compose(ibuf, IMSG_SMTP_SUBMIT_ACK, 0, 0, -1,
&ss, sizeof(ss));
break;
}
+ case IMSG_QUEUE_MESSAGE_FILE: {
+ struct message *messagep;
+ struct submit_status ss;
+ int fd;
+
+ messagep = imsg.data;
+ ss.id = messagep->session_id;
+
+ fd = queue_open_incoming_message_file(messagep);
+ if (fd == -1)
+ ss.code = 421;
+
+ imsg_compose(ibuf, IMSG_SMTP_MESSAGE_FILE, 0, 0, fd,
+ &ss, sizeof(ss));
+ break;
+ }
default:
log_debug("queue_dispatch_smtp: unexpected imsg %d",
imsg.hdr.type);
@@ -306,7 +339,7 @@ queue_dispatch_mda(int sig, short event, void *p)
if (messagep->status & S_MESSAGE_TEMPFAILURE) {
messagep->status &= ~S_MESSAGE_TEMPFAILURE;
messagep->flags &= ~F_MESSAGE_PROCESSING;
- queue_update_database(messagep);
+ queue_update_envelope(messagep);
break;
}
@@ -319,18 +352,17 @@ queue_dispatch_mda(int sig, short event, void *p)
messagep->id = queue_generate_id();
messagep->batch_id = 0;
messagep->type |= T_DAEMON_MESSAGE;
- messagep->flags |= F_MESSAGE_READY;
messagep->lasttry = 0;
messagep->retry = 0;
- queue_record_submission(messagep);
+ queue_record_envelope(messagep);
*messagep = msave;
}
- queue_remove_submission(messagep);
+ queue_remove_envelope(messagep);
break;
}
- /* no error, remove submission */
- queue_remove_submission(messagep);
+ /* no error, remove envelope */
+ queue_remove_envelope(messagep);
break;
}
@@ -402,7 +434,7 @@ queue_dispatch_mta(int sig, short event, void *p)
if (messagep->status & S_MESSAGE_TEMPFAILURE) {
messagep->status &= ~S_MESSAGE_TEMPFAILURE;
messagep->flags &= ~F_MESSAGE_PROCESSING;
- queue_update_database(messagep);
+ queue_update_envelope(messagep);
break;
}
@@ -415,18 +447,17 @@ queue_dispatch_mta(int sig, short event, void *p)
messagep->id = queue_generate_id();
messagep->batch_id = 0;
messagep->type |= T_DAEMON_MESSAGE;
- messagep->flags |= F_MESSAGE_READY;
messagep->lasttry = 0;
messagep->retry = 0;
- queue_record_submission(messagep);
+ queue_record_envelope(messagep);
*messagep = msave;
}
- queue_remove_submission(messagep);
+ queue_remove_envelope(messagep);
break;
}
- /* no error, remove submission */
- queue_remove_submission(messagep);
+ /* no error, remove envelope */
+ queue_remove_envelope(messagep);
break;
}
@@ -483,12 +514,7 @@ queue_dispatch_lka(int sig, short event, void *p)
messagep = imsg.data;
messagep->id = queue_generate_id();
messagep->batch_id = 0;
- queue_record_submission(messagep);
-
- if (messagep->type & T_MTA_MESSAGE) {
- messagep->flags |= F_MESSAGE_READY;
- queue_update_database(messagep);
- }
+ queue_record_incoming_envelope(messagep);
if (messagep->type & T_MDA_MESSAGE) {
imsg_compose(ibuf, IMSG_LKA_FORWARD_LOOKUP, 0, 0, -1,
@@ -503,8 +529,7 @@ queue_dispatch_lka(int sig, short event, void *p)
messagep = (struct message *)imsg.data;
messagep->id = queue_generate_id();
messagep->batch_id = 0;
- messagep->flags |= F_MESSAGE_READY;
- queue_record_submission(messagep);
+ queue_record_incoming_envelope(messagep);
break;
}
@@ -512,7 +537,7 @@ queue_dispatch_lka(int sig, short event, void *p)
struct message *messagep;
messagep = (struct message *)imsg.data;
- queue_remove_submission(messagep);
+ queue_remove_incoming_envelope(messagep);
break;
}
@@ -543,9 +568,14 @@ queue_setup_events(struct smtpd *env)
struct timeval tv;
evtimer_set(&env->sc_ev, queue_timeout, env);
- tv.tv_sec = 1;
- tv.tv_usec = 0;
+ tv.tv_sec = 0;
+ tv.tv_usec = 10;
evtimer_add(&env->sc_ev, &tv);
+
+ evtimer_set(&env->sc_rqev, queue_process_runqueue, env);
+ tv.tv_sec = 0;
+ tv.tv_usec = 10;
+ evtimer_add(&env->sc_rqev, &tv);
}
void
@@ -558,12 +588,13 @@ void
queue_timeout(int fd, short event, void *p)
{
struct smtpd *env = p;
- struct batch *batchp, *nxt;
struct timeval tv;
time_t curtime;
+ struct batch *batchp, *nxt;
+
+ queue_process(env);
curtime = time(NULL);
- queue_load_submissions(env, curtime);
for (batchp = SPLAY_MIN(batchtree, &env->batch_queue);
batchp != NULL;
@@ -582,95 +613,11 @@ queue_timeout(int fd, short event, void *p)
}
- tv.tv_sec = 5;
- tv.tv_usec = 0;
+ tv.tv_sec = 0;
+ tv.tv_usec = 10;
evtimer_add(&env->sc_ev, &tv);
}
-void
-queue_load_submissions(struct smtpd *env, time_t tm)
-{
- DIR *dirp;
- struct dirent *dp;
- struct batch *batchp;
- struct message *messagep;
- struct message message;
-
- dirp = opendir(PATH_ENVELOPES);
- if (dirp == NULL)
- err(1, "opendir");
-
- while ((dp = readdir(dirp)) != NULL) {
-
- if (dp->d_name[0] == '.')
- continue;
-
- if (! queue_message_from_id(dp->d_name, &message)) {
- warnx("failed to load message \"%s\"", dp->d_name);
- continue;
- }
-
- if (! queue_message_schedule(&message, tm)) {
- if (message.flags & F_MESSAGE_EXPIRED) {
- log_debug("message expired, create mdaemon");
- queue_remove_submission(&message);
- }
- continue;
- }
-
- message.lasttry = tm;
- message.flags |= F_MESSAGE_PROCESSING;
- queue_update_database(&message);
-
- messagep = calloc(1, sizeof (struct message));
- if (messagep == NULL)
- err(1, "calloc");
- *messagep = message;
-
- batchp = batch_lookup(env, messagep);
- if (batchp != NULL)
- messagep->batch_id = batchp->id;
-
- batchp = queue_record_batch(env, messagep);
- if (messagep->batch_id == 0)
- messagep->batch_id = batchp->id;
- }
-
- closedir(dirp);
-}
-
-int
-queue_message_from_id(char *message_id, struct message *message)
-{
- int spret;
- char pathname[MAXPATHLEN];
- FILE *fp;
-
- spret = snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_ENVELOPES, message_id);
- if (spret == -1 || spret >= MAXPATHLEN) {
- warnx("queue_load_submissions: filename too long.");
- return 0;
- }
-
- fp = fopen(pathname, "r");
- if (fp == NULL) {
- warnx("queue_load_submissions: fopen: %s", message_id);
- goto bad;
- }
-
- if (fread(message, sizeof(struct message), 1, fp) != 1) {
- warnx("queue_load_submissions: fread: %s", message_id);
- goto bad;
- }
-
- fclose(fp);
- return 1;
-bad:
- if (fp != NULL)
- fclose(fp);
- return 0;
-}
-
pid_t
queue(struct smtpd *env)
{
@@ -722,9 +669,6 @@ queue(struct smtpd *env)
SPLAY_INIT(&env->batch_queue);
- queue_init_submissions();
- queue_load_submissions(env, time(NULL));
-
event_init();
signal_set(&ev_sigint, SIGINT, queue_sig_handler, env);
@@ -743,145 +687,272 @@ queue(struct smtpd *env)
return (0);
}
-u_int64_t
-queue_generate_id(void)
+void
+queue_process(struct smtpd *env)
{
- u_int64_t id;
- struct timeval tp;
+ u_int16_t cbucket = 0;
+ static u_int16_t lbucket = 0;
+ DIR *dirp;
+ struct dirent *dp;
+ const char *errstr;
+ static u_int8_t bucketdone = 1;
- if (gettimeofday(&tp, NULL) == -1)
- fatal("queue_generate_id: time");
+ if (! bucketdone) {
+ bucketdone = queue_process_bucket(env, lbucket);
+ if (bucketdone)
+ lbucket = (lbucket + 1) % DIRHASH_BUCKETS;
+ return;
+ }
- id = (u_int32_t)tp.tv_sec;
- id <<= 32;
- id |= (u_int32_t)tp.tv_usec;
- usleep(1);
+ dirp = opendir(PATH_QUEUE);
+ if (dirp == NULL)
+ fatal("queue_process: opendir");
- return (id);
+ while ((dp = readdir(dirp)) != NULL) {
+
+ if (strcmp(dp->d_name, ".") == 0 ||
+ strcmp(dp->d_name, "..") == 0)
+ continue;
+
+ cbucket = strtonum(dp->d_name, 0, DIRHASH_BUCKETS - 1, &errstr);
+ if (errstr) {
+ log_warn("queue_process: %s/%s is not a valid bucket",
+ PATH_QUEUE, dp->d_name);
+ continue;
+ }
+
+ if (cbucket == lbucket)
+ break;
+ }
+ closedir(dirp);
+
+ if (dp == NULL) {
+ lbucket = (lbucket + 1) % DIRHASH_BUCKETS;
+ return;
+ }
+
+ bucketdone = queue_process_bucket(env, cbucket);
+ if (bucketdone)
+ lbucket = (lbucket + 1) % DIRHASH_BUCKETS;
}
int
-queue_create_message_file(char *message_id)
+queue_process_bucket(struct smtpd *env, u_int16_t bucket)
{
- int fd;
- char pathname[MAXPATHLEN];
int spret;
+ static DIR *dirp = NULL;
+ struct dirent *dp;
+ static char *msgid = NULL;
+ char bucketpath[MAXPATHLEN];
+ static u_int8_t messagedone = 1;
- spret = snprintf(pathname, MAXPATHLEN, "%s/%d.XXXXXXXXXXXXXXXX",
- PATH_MESSAGES, time(NULL));
+ if (! messagedone) {
+ messagedone = queue_process_message(env, msgid);
+ if (! messagedone)
+ return 0;
+ msgid = NULL;
+ }
+
+ spret = snprintf(bucketpath, MAXPATHLEN, "%s/%d", PATH_QUEUE, bucket);
if (spret == -1 || spret >= MAXPATHLEN)
- return -1;
+ fatal("queue_process_bucket: snprintf");
+
+ if (dirp == NULL) {
+ dirp = opendir(bucketpath);
+ if (dirp == NULL)
+ fatal("queue_process_bucket: opendir");
+ }
- fd = mkstemp(pathname);
- if (fd == -1)
- fatal("queue_create_message_file: mkstemp");
+ while ((dp = readdir(dirp)) != NULL) {
- /* XXX - this won't fail if message_id is MAXPATHLEN bytes */
- if (strlcpy(message_id, pathname + sizeof(PATH_MESSAGES), MAXPATHLEN)
- >= MAXPATHLEN)
- fatal("queue_create_message_file: message id too long");
+ if (strcmp(dp->d_name, ".") == 0 ||
+ strcmp(dp->d_name, "..") == 0)
+ continue;
- return fd;
+ break;
+ }
+
+ if (dp != NULL) {
+ msgid = dp->d_name;
+ messagedone = queue_process_message(env, msgid);
+ if (! messagedone)
+ return 0;
+ msgid = NULL;
+ }
+
+ closedir(dirp);
+ dirp = NULL;
+ return 1;
}
-void
-queue_delete_message_file(char *message_id)
+int
+queue_process_message(struct smtpd *env, char *messageid)
{
- char pathname[MAXPATHLEN];
int spret;
+ static DIR *dirp = NULL;
+ struct dirent *dp;
+ char evppath[MAXPATHLEN];
+ u_int16_t hval = 0;
- spret = snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_MESSAGES, message_id);
+ hval = hash(messageid, strlen(messageid)) % DIRHASH_BUCKETS;
+
+ spret = snprintf(evppath, MAXPATHLEN, "%s/%d/%s%s", PATH_QUEUE, hval,
+ messageid, PATH_ENVELOPES);
if (spret == -1 || spret >= MAXPATHLEN)
- fatal("queue_delete_message_file: message id too long");
+ fatal("queue_process_message: snprintf");
- if (unlink(pathname) == -1)
- fatal("queue_delete_message_file: unlink");
+ if (dirp == NULL) {
+ dirp = opendir(evppath);
+ if (dirp == NULL)
+ fatal("queue_process_message: opendir");
+ }
+
+ while ((dp = readdir(dirp)) != NULL) {
+
+ if (strcmp(dp->d_name, ".") == 0 ||
+ strcmp(dp->d_name, "..") == 0)
+ continue;
+ break;
+ }
+
+ if (dp != NULL) {
+ queue_process_envelope(env, messageid, dp->d_name);
+ return 0;
+ }
+
+ closedir(dirp);
+ dirp = NULL;
+ return 1;
}
-int
-queue_record_submission(struct message *message)
+void
+queue_process_envelope(struct smtpd *env, char *msgid, char *evpid)
{
- char pathname[MAXPATHLEN];
- char linkname[MAXPATHLEN];
- char dbname[MAXPATHLEN];
- char message_uid[MAXPATHLEN];
- char *spool;
- size_t spoolsz;
- int fd;
- int mode = O_CREAT|O_TRUNC|O_WRONLY|O_EXCL|O_SYNC;
int spret;
- FILE *fp;
+ struct message message;
+ time_t tm;
+ char evppath[MAXPATHLEN];
+ char rqpath[MAXPATHLEN];
+ u_int16_t hval;
+ struct stat sb;
- if (message->type & T_DAEMON_MESSAGE) {
- spool = PATH_DAEMON;
+ if (! queue_load_envelope(&message, evpid)) {
+ log_debug("failed to load envelope: %s", evpid);
+ return;
}
- else {
- switch (message->recipient.rule.r_action) {
- case A_MBOX:
- case A_MAILDIR:
- case A_EXT:
- spool = PATH_LOCAL;
- break;
- default:
- spool = PATH_RELAY;
+
+ tm = time(NULL);
+
+ if (! queue_message_schedule(&message, tm)) {
+ if (message.flags & F_MESSAGE_EXPIRED) {
+ log_debug("message has expired, mdaemon");
+ queue_remove_envelope(&message);
}
+ return;
}
- spoolsz = strlen(spool);
- spret = snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_MESSAGES,
- message->message_id);
+ message.flags |= F_MESSAGE_SCHEDULED;
+ queue_update_envelope(&message);
+
+ log_debug("SCHEDULED: %s", evpid);
+ hval = hash(msgid, strlen(msgid)) % DIRHASH_BUCKETS;
+ spret = snprintf(evppath, MAXPATHLEN, "%s/%d/%s%s/%s", PATH_QUEUE, hval,
+ msgid, PATH_ENVELOPES, evpid);
if (spret == -1 || spret >= MAXPATHLEN)
- fatal("queue_record_submission: message id too long");
+ fatal("queue_process_envelope: snprintf");
- for (;;) {
- spret = snprintf(linkname, MAXPATHLEN, "%s/%s.%qu", spool,
- message->message_id, (u_int64_t)arc4random());
- if (spret == -1 || spret >= MAXPATHLEN)
- fatal("queue_record_submission: message uid too long");
+ spret = snprintf(rqpath, MAXPATHLEN, "%s/%s", PATH_RUNQUEUE, evpid);
+ if (spret == -1 || spret >= MAXPATHLEN)
+ fatal("queue_process_envelope: snprintf");
- (void)strlcpy(message_uid, linkname + spoolsz + 1, MAXPATHLEN);
+ if (stat(rqpath, &sb) == -1) {
+ if (errno != ENOENT)
+ fatal("queue_process_envelope: stat");
- if (link(pathname, linkname) == -1) {
- if (errno == EEXIST)
- continue;
- err(1, "link: %s , %s", pathname, linkname);
+ if (symlink(evppath, rqpath) == -1) {
+ log_info("queue_process_envelope: "
+ "failed to place envelope in runqueue");
}
+ }
+}
- spret = snprintf(dbname, MAXPATHLEN, "%s/%s", PATH_ENVELOPES,
- message_uid);
- if (spret == -1 || spret >= MAXPATHLEN)
- fatal("queue_record_submission: database uid too long");
-
- fd = open(dbname, mode, 0600);
- if (fd == -1)
- if (unlink(linkname) == -1)
- fatal("queue_record_submission: unlink");
+void
+queue_process_runqueue(int fd, short event, void *p)
+{
+ DIR *dirp;
+ struct dirent *dp;
+ struct message message;
+ struct message *messagep;
+ struct batch *batchp;
+ char pathname[MAXPATHLEN];
+ time_t tm;
+ struct smtpd *env = p;
+ struct timeval tv;
- if (flock(fd, LOCK_EX) == -1)
- fatal("queue_record_submission: flock");
+ tm = time(NULL);
- fp = fdopen(fd, "w");
- if (fp == NULL)
- fatal("fdopen");
+ dirp = opendir(PATH_RUNQUEUE);
+ if (dirp == NULL)
+ fatal("queue_process_runqueue: opendir");
- if (strlcpy(message->message_uid, message_uid, MAXPATHLEN)
- >= MAXPATHLEN)
- fatal("queue_record_submission: message uid too long");
+ while ((dp = readdir(dirp)) != NULL) {
+ if (strcmp(dp->d_name, ".") == 0 ||
+ strcmp(dp->d_name, "..") == 0)
+ continue;
- message->creation = time(NULL);
+ /* XXX */
+ snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_RUNQUEUE, dp->d_name);
+ unlink(pathname);
- if (fwrite(message, sizeof(struct message), 1, fp) != 1) {
- fclose(fp);
- unlink(dbname);
- return 0;
+ if (! queue_load_envelope(&message, dp->d_name)) {
+ log_debug("failed to load envelope");
+ continue;
}
- fflush(fp);
- fsync(fd);
- fclose(fp);
- break;
+ if (message.flags & F_MESSAGE_PROCESSING)
+ continue;
+
+ message.lasttry = tm;
+ message.flags &= ~F_MESSAGE_SCHEDULED;
+ message.flags |= F_MESSAGE_PROCESSING;
+ queue_update_envelope(&message);
+
+ messagep = calloc(1, sizeof (struct message));
+ if (messagep == NULL)
+ err(1, "calloc");
+ *messagep = message;
+
+ batchp = batch_lookup(env, messagep);
+ if (batchp != NULL)
+ messagep->batch_id = batchp->id;
+
+ batchp = queue_record_batch(env, messagep);
+ if (messagep->batch_id == 0)
+ messagep->batch_id = batchp->id;
}
- return 1;
+
+ closedir(dirp);
+
+ tv.tv_sec = 0;
+ tv.tv_usec = 10;
+ evtimer_add(&env->sc_rqev, &tv);
+}
+
+u_int64_t
+queue_generate_id(void)
+{
+ u_int64_t id;
+ struct timeval tp;
+
+ if (gettimeofday(&tp, NULL) == -1)
+ fatal("queue_generate_id: time");
+
+ id = (u_int32_t)tp.tv_sec;
+ id <<= 32;
+ id |= (u_int32_t)tp.tv_usec;
+ usleep(1);
+
+ return (id);
}
struct batch *
@@ -941,71 +1012,6 @@ queue_record_batch(struct smtpd *env, struct message *messagep)
}
int
-queue_remove_submission(struct message *message)
-{
- char pathname[MAXPATHLEN];
- char linkname[MAXPATHLEN];
- char dbname[MAXPATHLEN];
- char *spool;
- struct stat sb;
- int spret;
-
- if (message->type & T_DAEMON_MESSAGE) {
- spool = PATH_DAEMON;
- }
- else {
- switch (message->recipient.rule.r_action) {
- case A_MBOX:
- case A_MAILDIR:
- case A_EXT:
- spool = PATH_LOCAL;
- break;
- default:
- spool = PATH_RELAY;
- }
- }
-
- spret = snprintf(dbname, MAXPATHLEN, "%s/%s", PATH_ENVELOPES,
- message->message_uid);
- if (spret == -1 || spret >= MAXPATHLEN)
- fatal("queue_remove_submission: database uid too long");
-
- spret = snprintf(linkname, MAXPATHLEN, "%s/%s", spool,
- message->message_uid);
- if (spret == -1 || spret >= MAXPATHLEN)
- fatal("queue_remove_submission: message uid too long");
-
- spret = snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_MESSAGES,
- message->message_id);
- if (spret == -1 || spret >= MAXPATHLEN)
- fatal("queue_remove_submission: message id too long");
-
- if (unlink(dbname) == -1) {
- warnx("dbname: %s", dbname);
- fatal("queue_remove_submission: unlink");
- }
-
- if (unlink(linkname) == -1) {
- warnx("linkname: %s", linkname);
- fatal("queue_remove_submission: unlink");
- }
-
- if (stat(pathname, &sb) == -1) {
- warnx("pathname: %s", pathname);
- fatal("queue_remove_submission: stat");
- }
-
- if (sb.st_nlink == 1) {
- if (unlink(pathname) == -1) {
- warnx("pathname: %s", pathname);
- fatal("queue_remove_submission: unlink");
- }
- }
-
- return 1;
-}
-
-int
queue_remove_batch_message(struct smtpd *env, struct batch *batchp, struct message *messagep)
{
TAILQ_REMOVE(&batchp->messages, messagep, entry);
@@ -1082,75 +1088,6 @@ queue_batch_resolved(struct smtpd *env, struct batch *lookup)
return 1;
}
-int
-queue_open_message_file(struct batch *batch)
-{
- int fd;
- char pathname[MAXPATHLEN];
- int spret;
-
- spret = snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_MESSAGES,
- batch->message_id);
- if (spret == -1 || spret >= MAXPATHLEN)
- fatal("queue_open_message_file: message id too long");
-
- fd = open(pathname, O_RDONLY);
- if (fd == -1)
- fatal("queue_open_message_file: open");
-
- return fd;
-}
-
-int
-queue_update_database(struct message *message)
-{
- int fd;
- char *spool;
- char pathname[MAXPATHLEN];
- int spret;
- FILE *fp;
- mode_t mode = O_RDWR;
-
- if (message->type & T_DAEMON_MESSAGE) {
- spool = PATH_DAEMON;
- }
- else {
- switch (message->recipient.rule.r_action) {
- case A_MBOX:
- case A_MAILDIR:
- case A_EXT:
- spool = PATH_LOCAL;
- break;
- default:
- spool = PATH_RELAY;
- }
- }
-
- spret = snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_ENVELOPES,
- message->message_uid);
- if (spret == -1 || spret >= MAXPATHLEN)
- fatal("queue_update_database: pathname too long");
-
- if ((fd = open(pathname, mode)) == -1)
- fatal("queue_update_database: cannot open database");
-
-
- if (flock(fd, LOCK_EX) == -1)
- fatal("queue_update_database: flock");
-
- fp = fdopen(fd, "w");
- if (fp == NULL)
- fatal("fdopen");
-
- if (fwrite(message, sizeof(struct message), 1, fp) != 1)
- fatal("queue_update_database: cannot write database");
- fflush(fp);
- fsync(fd);
- fclose(fp);
-
- return 1;
-}
-
struct batch *
batch_lookup(struct smtpd *env, struct message *message)
{
@@ -1201,98 +1138,6 @@ batch_cmp(struct batch *s1, struct batch *s2)
}
int
-queue_init_submissions(void)
-{
- DIR *dirp;
- struct dirent *dp;
- struct message message;
- char pathname[MAXPATHLEN];
- FILE *fp;
- int spret;
-
- dirp = opendir(PATH_ENVELOPES);
- if (dirp == NULL)
- err(1, "opendir");
-
- while ((dp = readdir(dirp)) != NULL) {
-
- if (dp->d_name[0] == '.')
- continue;
-
- spret = snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_ENVELOPES,
- dp->d_name);
- if (spret == -1 || spret >= MAXPATHLEN)
- continue;
-
- fp = fopen(pathname, "r");
- if (fp == NULL)
- continue;
-
- if (fread(&message, sizeof(struct message), 1, fp) != 1) {
- fclose(fp);
- continue;
- }
- fclose(fp);
-
- if ((message.flags & F_MESSAGE_COMPLETE) == 0)
- unlink(pathname);
- else {
- message.flags &= ~F_MESSAGE_PROCESSING;
- queue_update_database(&message);
- }
- }
-
- closedir(dirp);
- return 1;
-}
-
-int
-queue_message_complete(struct message *messagep)
-{
- DIR *dirp;
- struct dirent *dp;
- struct message message;
- char pathname[MAXPATHLEN];
- FILE *fp;
- int spret;
-
- dirp = opendir(PATH_ENVELOPES);
- if (dirp == NULL)
- err(1, "opendir");
-
- while ((dp = readdir(dirp)) != NULL) {
-
- if (dp->d_name[0] == '.')
- continue;
-
- if (strncmp(messagep->message_id,
- dp->d_name, strlen(messagep->message_id)) != 0)
- continue;
-
- spret = snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_ENVELOPES,
- dp->d_name);
- if (spret == -1 || spret >= MAXPATHLEN)
- continue;
-
- fp = fopen(pathname, "r");
- if (fp == NULL)
- continue;
-
- if (fread(&message, sizeof(struct message), 1, fp) != 1) {
- fclose(fp);
- continue;
- }
- fclose(fp);
-
- message.flags |= F_MESSAGE_COMPLETE;
- queue_update_database(&message);
- }
-
- closedir(dirp);
- return 1;
-}
-
-int
queue_message_schedule(struct message *messagep, time_t tm)
{
time_t delay;
@@ -1307,11 +1152,8 @@ queue_message_schedule(struct message *messagep, time_t tm)
messagep->flags |= F_MESSAGE_EXPIRED;
return 0;
}
-
- if ((messagep->flags & F_MESSAGE_READY) == 0)
- return 0;
-
- if ((messagep->flags & F_MESSAGE_COMPLETE) == 0)
+
+ if ((messagep->flags & F_MESSAGE_SCHEDULED) != 0)
return 0;
if ((messagep->flags & F_MESSAGE_PROCESSING) != 0)
@@ -1347,12 +1189,6 @@ queue_message_schedule(struct message *messagep, time_t tm)
}
void
-batch_unschedule(struct batch *batchp)
-{
- batchp->flags &= ~(F_BATCH_SCHEDULED);
-}
-
-void
batch_send(struct smtpd *env, struct batch *batchp, time_t curtime)
{
u_int8_t proctype;
@@ -1390,6 +1226,7 @@ batch_by_id(struct smtpd *env, u_int64_t id)
return SPLAY_FIND(batchtree, &env->batch_queue, &lookup);
}
+
struct message *
message_by_id(struct smtpd *env, struct batch *batchp, u_int64_t id)
{
@@ -1412,4 +1249,520 @@ message_by_id(struct smtpd *env, struct batch *batchp, u_int64_t id)
return NULL;
}
+int
+queue_create_incoming_layout(char *message_id)
+{
+ int spret;
+ char rootdir[MAXPATHLEN];
+ char evpdir[MAXPATHLEN];
+
+ spret = snprintf(rootdir, MAXPATHLEN, "%s/%d.XXXXXXXXXXXXXXXX",
+ PATH_INCOMING, time(NULL));
+ if (spret == -1 || spret >= MAXPATHLEN)
+ return -1;
+
+ if (mkdtemp(rootdir) == NULL)
+ return -1;
+
+ if (strlcpy(message_id, rootdir + strlen(PATH_INCOMING) + 1, MAXPATHLEN)
+ >= MAXPATHLEN)
+ goto badroot;
+
+ spret = snprintf(evpdir, MAXPATHLEN, "%s%s",
+ rootdir, PATH_ENVELOPES);
+ if (spret == -1 || spret >= MAXPATHLEN)
+ goto badroot;
+
+ if (mkdir(evpdir, 0700) == -1)
+ goto badroot;
+
+ return 1;
+
+badroot:
+ if (rmdir(rootdir) == -1)
+ fatal("queue_create_incoming_layout: rmdir");
+
+ return 0;
+}
+
+void
+queue_delete_incoming_message(char *message_id)
+{
+ int spret;
+ char rootdir[MAXPATHLEN];
+ char evpdir[MAXPATHLEN];
+ char evppath[MAXPATHLEN];
+ char msgpath[MAXPATHLEN];
+ DIR *dirp;
+ struct dirent *dp;
+
+ spret = snprintf(rootdir, MAXPATHLEN, "%s/%s", PATH_INCOMING,
+ message_id);
+ if (spret == -1 || spret >= MAXPATHLEN)
+ fatal("queue_delete_incoming_message: snprintf");
+
+ spret = snprintf(evpdir, MAXPATHLEN, "%s%s",
+ rootdir, PATH_ENVELOPES);
+ if (spret == -1 || spret >= MAXPATHLEN)
+ fatal("queue_delete_incoming_message: snprintf");
+
+ spret = snprintf(msgpath, MAXPATHLEN, "%s/message", rootdir);
+ if (spret == -1 || spret >= MAXPATHLEN)
+ fatal("queue_delete_incoming_message: snprintf");
+
+ if (unlink(msgpath) == -1) {
+ if (errno != ENOENT)
+ fatal("queue_delete_incoming_message: unlink");
+ }
+
+ log_debug("evpdir: %s", evpdir);
+ dirp = opendir(evpdir);
+ if (dirp == NULL)
+ fatal("queue_delete_incoming_message: opendir");
+ while ((dp = readdir(dirp)) != NULL) {
+ if (strcmp(dp->d_name, ".") == 0 ||
+ strcmp(dp->d_name, "..") == 0)
+ continue;
+ spret = snprintf(evppath, MAXPATHLEN, "%s/%s", evpdir, dp->d_name);
+ if (spret == -1 || spret >= MAXPATHLEN)
+ fatal("queue_create_incoming_message: snprintf");
+
+ if (unlink(evppath) == -1) {
+ if (errno != ENOENT)
+ fatal("queue_create_incoming_message: unlink");
+ }
+ }
+ closedir(dirp);
+
+ if (rmdir(evpdir) == -1)
+ if (errno != ENOENT)
+ fatal("queue_create_incoming_message: rmdir");
+
+ if (rmdir(rootdir) == -1)
+ if (errno != ENOENT)
+ fatal("queue_create_incoming_message: rmdir");
+
+ return;
+}
+
+int
+queue_record_incoming_envelope(struct message *message)
+{
+ char rootdir[MAXPATHLEN];
+ char evpdir[MAXPATHLEN];
+ char evpname[MAXPATHLEN];
+ char message_uid[MAXPATHLEN];
+ int fd;
+ int mode = O_CREAT|O_TRUNC|O_WRONLY|O_EXCL|O_SYNC;
+ int spret;
+ FILE *fp;
+ int ret;
+
+ spret = snprintf(rootdir, MAXPATHLEN, "%s/%s", PATH_INCOMING,
+ message->message_id);
+ if (spret == -1 || spret >= MAXPATHLEN)
+ fatal("queue_record_incoming_envelope: snprintf");
+
+ spret = snprintf(evpdir, MAXPATHLEN, "%s%s", rootdir, PATH_ENVELOPES);
+ if (spret == -1 || spret >= MAXPATHLEN)
+ fatal("queue_record_incoming_envelope: snprintf");
+
+ for (;;) {
+ spret = snprintf(evpname, MAXPATHLEN, "%s/%s.%qu", evpdir,
+ message->message_id, (u_int64_t)arc4random());
+ if (spret == -1 || spret >= MAXPATHLEN)
+ fatal("queue_record_incoming_envelope: snprintf");
+
+ (void)strlcpy(message_uid, evpname + strlen(evpdir) + 1, MAXPATHLEN);
+
+ fd = open(evpname, mode, 0600);
+ if (fd == -1) {
+ if (errno == EEXIST)
+ continue;
+ return 0;
+ }
+
+ if (flock(fd, LOCK_EX) == -1)
+ fatal("queue_record_submission: flock");
+
+ fp = fdopen(fd, "w");
+ if (fp == NULL)
+ fatal("fdopen");
+
+ if (strlcpy(message->message_uid, message_uid, MAXPATHLEN)
+ >= MAXPATHLEN)
+ fatal("queue_record_submission: strlcpy");
+
+ message->creation = time(NULL);
+
+ if ((ret = fwrite(message, sizeof (struct message), 1, fp)) != 1) {
+ fclose(fp);
+ unlink(evpname);
+ return 0;
+ }
+ fflush(fp);
+ fsync(fd);
+ fclose(fp);
+
+ break;
+ }
+ return 1;
+}
+
+int
+queue_update_incoming_envelope(struct message *messagep)
+{
+ int fd;
+ char pathname[MAXPATHLEN];
+ int spret;
+ FILE *fp;
+ mode_t mode = O_RDWR;
+
+ spret = snprintf(pathname, MAXPATHLEN, "%s/%s%s/%s", PATH_INCOMING,
+ messagep->message_id, PATH_ENVELOPES, messagep->message_uid);
+ if (spret == -1 || spret >= MAXPATHLEN)
+ fatal("queue_update_incoming_envelope: snprintf");
+
+ if ((fd = open(pathname, mode)) == -1)
+ fatal("queue_update_incoming_envelope: open");
+
+ if (flock(fd, LOCK_EX) == -1)
+ fatal("queue_update_incoming_envelope: flock");
+
+ fp = fdopen(fd, "w");
+ if (fp == NULL)
+ fatal("queue_update_incoming_envelope: fdopen");
+
+ if (fwrite(messagep, sizeof(struct message), 1, fp) != 1)
+ fatal("queue_update_incoming_envelope: fwrite");
+ fflush(fp);
+ fsync(fd);
+ fclose(fp);
+
+ return 1;
+}
+
+int
+queue_remove_incoming_envelope(struct message *message)
+{
+ char pathname[MAXPATHLEN];
+ int spret;
+
+ spret = snprintf(pathname, MAXPATHLEN, "%s/%s%s/%s", PATH_INCOMING,
+ message->message_id, PATH_ENVELOPES, message->message_uid);
+ if (spret == -1 || spret >= MAXPATHLEN)
+ fatal("queue_remove_incoming_envelope: snprintf");
+
+ if (unlink(pathname) == -1)
+ if (errno != ENOENT)
+ fatal("queue_remove_incoming_envelope: unlink");
+
+ return 1;
+}
+
+int
+queue_commit_incoming_message(struct message *messagep)
+{
+ int spret;
+ char rootdir[MAXPATHLEN];
+ char queuedir[MAXPATHLEN];
+ u_int16_t hval;
+
+ spret = snprintf(rootdir, MAXPATHLEN, "%s/%s", PATH_INCOMING,
+ messagep->message_id);
+ if (spret == -1 || spret >= MAXPATHLEN)
+ fatal("queue_commit_message_incoming: snprintf");
+
+ hval = queue_message_hash(messagep);
+
+ spret = snprintf(queuedir, MAXPATHLEN, "%s/%d", PATH_QUEUE, hval);
+ if (spret == -1 || spret >= MAXPATHLEN)
+ fatal("queue_commit_message_incoming: snprintf");
+
+ if (mkdir(queuedir, 0700) == -1) {
+ if (errno == ENOSPC)
+ return 0;
+ if (errno != EEXIST)
+ fatal("queue_commit_message_incoming: mkdir");
+ }
+
+ spret = snprintf(queuedir, MAXPATHLEN, "%s/%d/%s", PATH_QUEUE, hval,
+ messagep->message_id);
+ if (spret == -1 || spret >= MAXPATHLEN)
+ fatal("queue_commit_message_incoming: snprintf");
+
+
+ if (rename(rootdir, queuedir) == -1)
+ fatal("queue_commit_message_incoming: rename");
+
+ return 1;
+}
+
+int
+queue_open_incoming_message_file(struct message *messagep)
+{
+ char pathname[MAXPATHLEN];
+ int spret;
+ mode_t mode = O_CREAT|O_EXCL|O_RDWR;
+
+ spret = snprintf(pathname, MAXPATHLEN, "%s/%s/message", PATH_INCOMING,
+ messagep->message_id);
+ if (spret == -1 || spret >= MAXPATHLEN)
+ fatal("queue_open_incoming_message_file: snprintf");
+
+ return open(pathname, mode, 0600);
+}
+
+int
+queue_record_envelope(struct message *messagep)
+{
+ char queuedir[MAXPATHLEN];
+ char evpdir[MAXPATHLEN];
+ char evpname[MAXPATHLEN];
+ char message_uid[MAXPATHLEN];
+ int fd;
+ int mode = O_CREAT|O_TRUNC|O_WRONLY|O_EXCL|O_SYNC;
+ int spret;
+ FILE *fp;
+ int ret;
+ u_int16_t hval;
+
+ spret = snprintf(queuedir, MAXPATHLEN, "%s/%s", PATH_QUEUE,
+ messagep->message_id);
+ if (spret == -1 || spret >= MAXPATHLEN)
+ fatal("queue_record_envelope: snprintf");
+
+ hval = queue_message_hash(messagep);
+
+ spret = snprintf(queuedir, MAXPATHLEN, "%s/%d", PATH_QUEUE, hval);
+ if (spret == -1 || spret >= MAXPATHLEN)
+ fatal("queue_record_envelope: snprintf");
+
+ spret = snprintf(evpdir, MAXPATHLEN, "%s/%s%s", queuedir, messagep->message_id,
+ PATH_ENVELOPES);
+ if (spret == -1 || spret >= MAXPATHLEN)
+ fatal("queue_record_envelope: snprintf");
+
+ for (;;) {
+ spret = snprintf(evpname, MAXPATHLEN, "%s/%s.%qu", evpdir,
+ messagep->message_id, (u_int64_t)arc4random());
+ if (spret == -1 || spret >= MAXPATHLEN)
+ fatal("queue_record_envelope: snprintf");
+
+ (void)strlcpy(message_uid, evpname + strlen(evpdir) + 1, MAXPATHLEN);
+
+ fd = open(evpname, mode, 0600);
+ if (fd == -1) {
+ if (errno == EEXIST)
+ continue;
+ log_debug("failed to open %s", evpname);
+ fatal("queue_record_envelope: open");
+ }
+
+ if (flock(fd, LOCK_EX) == -1)
+ fatal("queue_record_envelope: flock");
+
+ fp = fdopen(fd, "w");
+ if (fp == NULL)
+ fatal("fdopen");
+
+ if (strlcpy(messagep->message_uid, message_uid, MAXPATHLEN)
+ >= MAXPATHLEN)
+ fatal("queue_record_envelope: strlcpy");
+
+ messagep->creation = time(NULL);
+
+ if ((ret = fwrite(messagep, sizeof (struct message), 1, fp)) != 1) {
+ fclose(fp);
+ unlink(evpname);
+ return 0;
+ }
+ fflush(fp);
+ fsync(fd);
+ fclose(fp);
+
+ break;
+ }
+ return 1;
+
+}
+
+int
+queue_remove_envelope(struct message *messagep)
+{
+ char pathname[MAXPATHLEN];
+ u_int16_t hval;
+ int spret;
+
+ hval = queue_message_hash(messagep);
+
+ spret = snprintf(pathname, MAXPATHLEN, "%s/%d/%s%s/%s", PATH_QUEUE,
+ hval, messagep->message_id, PATH_ENVELOPES, messagep->message_uid);
+ if (spret == -1 || spret >= MAXPATHLEN)
+ fatal("queue_remove_incoming_envelope: snprintf");
+
+ if (unlink(pathname) == -1)
+ fatal("queue_remove_incoming_envelope: unlink");
+
+ spret = snprintf(pathname, MAXPATHLEN, "%s/%d/%s%s", PATH_QUEUE,
+ hval, messagep->message_id, PATH_ENVELOPES);
+ if (spret == -1 || spret >= MAXPATHLEN)
+ fatal("queue_remove_incoming_envelope: snprintf");
+
+ if (rmdir(pathname) != -1)
+ queue_delete_message(messagep->message_id);
+
+ return 1;
+}
+
+int
+queue_update_envelope(struct message *messagep)
+{
+ int fd;
+ char pathname[MAXPATHLEN];
+ int spret;
+ FILE *fp;
+ mode_t mode = O_RDWR;
+ u_int16_t hval;
+
+ hval = queue_message_hash(messagep);
+
+ spret = snprintf(pathname, MAXPATHLEN, "%s/%d/%s%s/%s", PATH_QUEUE,
+ hval, messagep->message_id, PATH_ENVELOPES, messagep->message_uid);
+ if (spret == -1 || spret >= MAXPATHLEN)
+ fatal("queue_update_envelope: snprintf");
+
+ if ((fd = open(pathname, mode)) == -1)
+ fatal("queue_update_envelope: open");
+
+ if (flock(fd, LOCK_EX) == -1)
+ fatal("queue_update_envelope: flock");
+
+ fp = fdopen(fd, "w");
+ if (fp == NULL)
+ fatal("queue_update_envelope: fdopen");
+
+ if (fwrite(messagep, sizeof(struct message), 1, fp) != 1)
+ fatal("queue_update_envelope: fwrite");
+ fflush(fp);
+ fsync(fd);
+ fclose(fp);
+
+ return 1;
+}
+
+int
+queue_load_envelope(struct message *messagep, char *evpid)
+{
+ int spret;
+ char pathname[MAXPATHLEN];
+ u_int16_t hval;
+ FILE *fp;
+ char msgid[MAXPATHLEN];
+
+ strlcpy(msgid, evpid, MAXPATHLEN);
+ *strrchr(msgid, '.') = '\0';
+
+ hval = hash(msgid, strlen(msgid)) % DIRHASH_BUCKETS;
+ spret = snprintf(pathname, MAXPATHLEN, "%s/%d/%s%s/%s", PATH_QUEUE,
+ hval, msgid, PATH_ENVELOPES, evpid);
+
+ fp = fopen(pathname, "r");
+ if (fp == NULL)
+ return 0;
+
+ if (fread(messagep, sizeof(struct message), 1, fp) != 1)
+ fatal("queue_load_envelope: fread");
+
+ fclose(fp);
+
+ return 1;
+}
+
+int
+queue_open_message_file(struct batch *batchp)
+{
+ int fd;
+ char pathname[MAXPATHLEN];
+ int spret;
+ mode_t mode = O_RDONLY;
+ u_int16_t hval;
+
+ hval = hash(batchp->message_id, strlen(batchp->message_id)) % DIRHASH_BUCKETS;
+
+ spret = snprintf(pathname, MAXPATHLEN, "%s/%d/%s/message", PATH_QUEUE,
+ hval, batchp->message_id);
+ if (spret == -1 || spret >= MAXPATHLEN)
+ fatal("queue_open_message_file: snprintf");
+
+ if ((fd = open(pathname, mode)) == -1)
+ fatal("queue_open_message_file: open");
+
+ return fd;
+}
+
+void
+queue_delete_message(char *msgid)
+{
+ int spret;
+ char rootdir[MAXPATHLEN];
+ char evpdir[MAXPATHLEN];
+ char msgpath[MAXPATHLEN];
+ u_int16_t hval;
+
+ hval = hash(msgid, strlen(msgid)) % DIRHASH_BUCKETS;
+ spret = snprintf(rootdir, MAXPATHLEN, "%s/%d/%s", PATH_QUEUE,
+ hval, msgid);
+ if (spret == -1 || spret >= MAXPATHLEN)
+ fatal("queue_delete_message: snprintf");
+
+ spret = snprintf(evpdir, MAXPATHLEN, "%s%s",
+ rootdir, PATH_ENVELOPES);
+ if (spret == -1 || spret >= MAXPATHLEN)
+ fatal("queue_delete_message: snprintf");
+
+ spret = snprintf(msgpath, MAXPATHLEN, "%s/message", rootdir);
+ if (spret == -1 || spret >= MAXPATHLEN)
+ fatal("queue_delete_message: snprintf");
+
+ if (unlink(msgpath) == -1)
+ if (errno != ENOENT)
+ fatal("queue_delete_message: unlink");
+
+ if (rmdir(evpdir) == -1)
+ if (errno != ENOENT)
+ fatal("queue_delete_message: rmdir");
+
+ if (rmdir(rootdir) == -1)
+ if (errno != ENOENT)
+ fatal("queue_delete_message: rmdir");
+
+ spret = snprintf(rootdir, MAXPATHLEN, "%s/%d", PATH_QUEUE,
+ hval);
+ if (spret == -1 || spret >= MAXPATHLEN)
+ fatal("queue_delete_message: snprintf");
+
+ rmdir(rootdir);
+
+ return;
+}
+
+u_int16_t
+queue_message_hash(struct message *messagep)
+{
+ return hash(messagep->message_id, strlen(messagep->message_id))
+ % DIRHASH_BUCKETS;
+}
+
+u_int32_t
+hash(u_int8_t *buf, size_t len)
+{
+ u_int32_t h;
+
+ for (h = 5381; len; len--)
+ h = ((h << 5) + h) + *buf++;
+
+ return h;
+}
+
SPLAY_GENERATE(batchtree, batch, b_nodes, batch_cmp);
diff --git a/usr.sbin/smtpd/smtp.c b/usr.sbin/smtpd/smtp.c
index 7e456a84508..a85baa45256 100644
--- a/usr.sbin/smtpd/smtp.c
+++ b/usr.sbin/smtpd/smtp.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: smtp.c,v 1.7 2008/11/24 22:30:19 gilles Exp $ */
+/* $OpenBSD: smtp.c,v 1.8 2008/12/03 17:58:00 gilles Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -358,11 +358,10 @@ smtp_dispatch_queue(int sig, short event, void *p)
break;
switch (imsg.hdr.type) {
- case IMSG_SMTP_MESSAGE_FILE: {
+ case IMSG_SMTP_MESSAGE_ID: {
struct submit_status *ss;
struct session *s;
struct session key;
- int fd;
log_debug("smtp_dispatch_queue: queue handled message creation");
ss = imsg.data;
@@ -377,6 +376,25 @@ smtp_dispatch_queue(int sig, short event, void *p)
(void)strlcpy(s->s_msg.message_id, ss->u.msgid,
sizeof(s->s_msg.message_id));
+ session_pickup(s, ss);
+ break;
+ }
+ case IMSG_SMTP_MESSAGE_FILE: {
+ struct submit_status *ss;
+ struct session *s;
+ struct session key;
+ int fd;
+
+ log_debug("smtp_dispatch_queue: queue handled message creation");
+ ss = imsg.data;
+
+ key.s_id = ss->id;
+
+ s = SPLAY_FIND(sessiontree, &env->sc_sessions, &key);
+ if (s == NULL) {
+ /* Session was removed while we were waiting for the message */
+ break;
+ }
fd = imsg_get_fd(ibuf, &imsg);
if (fd != -1) {
diff --git a/usr.sbin/smtpd/smtp_session.c b/usr.sbin/smtpd/smtp_session.c
index c9ba7ed8815..96294317722 100644
--- a/usr.sbin/smtpd/smtp_session.c
+++ b/usr.sbin/smtpd/smtp_session.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: smtp_session.c,v 1.13 2008/11/25 20:35:54 gilles Exp $ */
+/* $OpenBSD: smtp_session.c,v 1.14 2008/12/03 17:58:00 gilles Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -403,7 +403,6 @@ session_rfc5321_rcpt_handler(struct session *s, char *args)
}
mr.id = s->s_msg.id;
-
s->s_state = S_RCPTREQUEST;
mr.ss = s->s_ss;
@@ -446,8 +445,10 @@ session_rfc5321_data_handler(struct session *s, char *args)
}
s->s_state = S_DATA;
- session_pickup(s, NULL);
-
+ imsg_compose(s->s_env->sc_ibufs[PROC_QUEUE],
+ IMSG_QUEUE_MESSAGE_FILE, 0, 0, -1, &s->s_msg,
+ sizeof(s->s_msg));
+ bufferevent_disable(s->s_bev, EV_READ);
return 1;
}
@@ -496,6 +497,8 @@ session_command(struct session *s, char *cmd, char *args)
{
int i;
+ bufferevent_enable(s->s_bev, EV_WRITE);
+
if (!(s->s_flags & F_EHLO))
goto rfc5321;
@@ -548,8 +551,10 @@ session_pickup(struct session *s, struct submit_status *ss)
if (s == NULL)
fatal("session_pickup: desynchronized");
- bufferevent_disable(s->s_bev, EV_READ);
- bufferevent_enable(s->s_bev, EV_WRITE);
+ bufferevent_enable(s->s_bev, EV_READ|EV_WRITE);
+
+ if (ss != NULL && ss->code == 421)
+ goto tempfail;
switch (s->s_state) {
case S_INIT:
@@ -564,9 +569,9 @@ session_pickup(struct session *s, struct submit_status *ss)
break;
case S_TLS:
+ bufferevent_disable(s->s_bev, EV_READ|EV_WRITE);
s->s_state = S_GREETED;
ssl_session_init(s);
- bufferevent_disable(s->s_bev, EV_READ|EV_WRITE);
break;
case S_AUTH:
@@ -592,24 +597,24 @@ session_pickup(struct session *s, struct submit_status *ss)
s->s_state = S_MAIL;
s->s_msg.sender = ss->u.path;
+ if (s->s_msg.datafp != NULL) {
+ fclose(s->s_msg.datafp);
+ s->s_msg.datafp = NULL;
+ imsg_compose(s->s_env->sc_ibufs[PROC_QUEUE], IMSG_QUEUE_REMOVE_MESSAGE,
+ 0, 0, -1, &s->s_msg, sizeof(s->s_msg));
+ }
+
imsg_compose(s->s_env->sc_ibufs[PROC_QUEUE],
- IMSG_QUEUE_CREATE_MESSAGE_FILE, 0, 0, -1, &s->s_msg,
+ IMSG_QUEUE_CREATE_MESSAGE, 0, 0, -1, &s->s_msg,
sizeof(s->s_msg));
-
+ bufferevent_disable(s->s_bev, EV_READ);
break;
case S_MAIL:
- evbuffer_add_printf(s->s_bev->output,
- "%d Sender ok\r\n", ss->code);
- strlcpy(s->s_msg.message_id, ss->u.msgid, MAXPATHLEN);
+ evbuffer_add_printf(s->s_bev->output, "%d Sender ok\r\n",
+ ss->code);
- if (s->s_msg.datafp == NULL) {
- /* Remove message file */
- imsg_compose(s->s_env->sc_ibufs[PROC_QUEUE], IMSG_QUEUE_DELETE_MESSAGE_FILE,
- 0, 0, -1, &s->s_msg, sizeof(s->s_msg));
- return;
- }
break;
case S_RCPTREQUEST:
@@ -629,22 +634,20 @@ session_pickup(struct session *s, struct submit_status *ss)
s->s_msg.rcptcount++;
s->s_msg.recipient = ss->u.path;
imsg_compose(s->s_env->sc_ibufs[PROC_QUEUE],
- IMSG_QUEUE_MESSAGE_SUBMIT, 0, 0, -1, &s->s_msg,
+ IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, &s->s_msg,
sizeof(s->s_msg));
-
+ bufferevent_disable(s->s_bev, EV_READ);
break;
case S_RCPT:
- evbuffer_add_printf(s->s_bev->output,
- "%d Recipient ok\r\n", ss->code);
+ evbuffer_add_printf(s->s_bev->output, "%d Recipient ok\r\n",
+ ss->code);
break;
case S_DATA:
- if (s->s_msg.datafp == NULL) {
- evbuffer_add_printf(s->s_bev->output,
- "421 Service temporarily unavailable\r\n");
- return;
- }
+ if (s->s_msg.datafp == NULL)
+ goto tempfail;
+
s->s_state = S_DATACONTENT;
evbuffer_add_printf(s->s_bev->output,
"354 Enter mail, end with \".\" on a line by itself\r\n");
@@ -655,6 +658,7 @@ session_pickup(struct session *s, struct submit_status *ss)
case S_DONE:
s->s_state = S_HELO;
+
s->s_msg.datafp = NULL;
evbuffer_add_printf(s->s_bev->output,
"250 %s Message accepted for delivery\r\n",
@@ -667,6 +671,14 @@ session_pickup(struct session *s, struct submit_status *ss)
fatal("session_pickup: unknown state");
break;
}
+
+ return;
+
+tempfail:
+ s->s_flags |= F_QUIT;
+ evbuffer_add_printf(s->s_bev->output,
+ "421 Service temporarily unavailable\r\n");
+ return;
}
void
@@ -709,8 +721,6 @@ read:
s->s_tm = time(NULL);
line = evbuffer_readline(bev->input);
if (line == NULL) {
- bufferevent_disable(s->s_bev, EV_READ);
- bufferevent_enable(s->s_bev, EV_WRITE);
return;
}
@@ -726,9 +736,8 @@ read:
s->s_msg.datafp = NULL;
/* Remove message file */
- imsg_compose(s->s_env->sc_ibufs[PROC_QUEUE], IMSG_QUEUE_DELETE_MESSAGE_FILE,
+ imsg_compose(s->s_env->sc_ibufs[PROC_QUEUE], IMSG_QUEUE_REMOVE_MESSAGE,
0, 0, -1, &s->s_msg, sizeof(s->s_msg));
- bufferevent_enable(s->s_bev, EV_WRITE);
free(line);
return;
}
@@ -749,7 +758,8 @@ read:
for (i = 0; i < len; ++i) {
if (line[i] & 0x80) {
s->s_msg.status |= S_MESSAGE_PERMFAILURE;
- strlcpy(s->s_msg.session_errorline, "8BIT data transfered over 7BIT limited channel",
+ strlcpy(s->s_msg.session_errorline,
+ "8BIT data transfered over 7BIT limited channel",
sizeof s->s_msg.session_errorline);
}
}
@@ -758,8 +768,6 @@ read:
}
goto read;
}
- bufferevent_disable(s->s_bev, EV_READ);
- bufferevent_enable(s->s_bev, EV_WRITE);
line[strcspn(line, "\r")] = '\0';
if ((ep = strchr(line, ':')) == NULL)
@@ -783,20 +791,14 @@ session_write(struct bufferevent *bev, void *p)
struct session *s = p;
if (!(s->s_flags & F_QUIT)) {
- if (! EVBUFFER_LENGTH(EVBUFFER_OUTPUT(bev))) {
- bufferevent_disable(s->s_bev, EV_WRITE);
- bufferevent_enable(s->s_bev, EV_READ);
- }
-
+
if (s->s_state == S_TLS)
session_pickup(s, NULL);
return;
}
- if (! EVBUFFER_LENGTH(EVBUFFER_OUTPUT(bev))) {
- session_destroy(s);
- }
+ session_destroy(s);
}
void
@@ -811,13 +813,14 @@ session_destroy(struct session *s)
if (s->s_msg.datafp != NULL) {
fclose(s->s_msg.datafp);
s->s_msg.datafp = NULL;
- /* Remove message file */
- imsg_compose(s->s_env->sc_ibufs[PROC_QUEUE], IMSG_QUEUE_DELETE_MESSAGE_FILE,
+ }
+
+ if (s->s_state > S_MAIL) {
+ imsg_compose(s->s_env->sc_ibufs[PROC_QUEUE], IMSG_QUEUE_REMOVE_MESSAGE,
0, 0, -1, &s->s_msg, sizeof(s->s_msg));
}
if (s->s_bev != NULL) {
- bufferevent_disable(s->s_bev, EV_READ|EV_WRITE);
bufferevent_free(s->s_bev);
}
ssl_session_destroy(s);
@@ -839,7 +842,7 @@ void
session_msg_submit(struct session *s)
{
imsg_compose(s->s_env->sc_ibufs[PROC_QUEUE],
- IMSG_QUEUE_MESSAGE_COMPLETE, 0, 0, -1, &s->s_msg,
+ IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &s->s_msg,
sizeof(s->s_msg));
s->s_state = S_DONE;
}
diff --git a/usr.sbin/smtpd/smtpd.c b/usr.sbin/smtpd/smtpd.c
index 335bb1e2ac7..1dd188dd22e 100644
--- a/usr.sbin/smtpd/smtpd.c
+++ b/usr.sbin/smtpd/smtpd.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: smtpd.c,v 1.11 2008/11/22 22:22:05 gilles Exp $ */
+/* $OpenBSD: smtpd.c,v 1.12 2008/12/03 17:58:00 gilles Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -59,6 +59,7 @@ int parent_maildir_init(struct passwd *, char *);
int parent_external_mda(struct batch *, struct path *);
int check_child(pid_t, const char *);
int setup_spool(uid_t, gid_t);
+u_int16_t queue_message_hash(struct message *);
pid_t lka_pid = 0;
pid_t mfa_pid = 0;
@@ -622,8 +623,9 @@ int
setup_spool(uid_t uid, gid_t gid)
{
unsigned int n;
- char *paths[] = { PATH_MESSAGES, PATH_LOCAL, PATH_RELAY,
- PATH_DAEMON, PATH_ENVELOPES };
+ char *paths[] = { PATH_INCOMING, PATH_QUEUE,
+ PATH_RUNQUEUE, PATH_RUNQUEUELOW,
+ PATH_RUNQUEUEHIGH };
char pathname[MAXPATHLEN];
struct stat sb;
int ret;
@@ -751,9 +753,14 @@ parent_open_message_file(struct batch *batchp)
int fd;
char pathname[MAXPATHLEN];
int spret;
+ u_int16_t hval;
+ struct message *messagep;
- spret = snprintf(pathname, MAXPATHLEN, "%s%s/%s",
- PATH_SPOOL, PATH_MESSAGES, batchp->message_id);
+ messagep = &batchp->message;
+ hval = queue_message_hash(messagep);
+
+ spret = snprintf(pathname, MAXPATHLEN, "%s%s/%d/%s/message",
+ PATH_SPOOL, PATH_QUEUE, hval, batchp->message_id);
if (spret == -1 || spret >= MAXPATHLEN) {
batchp->message.status |= S_MESSAGE_PERMFAILURE;
return -1;
diff --git a/usr.sbin/smtpd/smtpd.h b/usr.sbin/smtpd/smtpd.h
index a20582e0a25..a4c9b296431 100644
--- a/usr.sbin/smtpd/smtpd.h
+++ b/usr.sbin/smtpd/smtpd.h
@@ -1,4 +1,4 @@
-/* $OpenBSD: smtpd.h,v 1.16 2008/11/25 23:06:15 gilles Exp $ */
+/* $OpenBSD: smtpd.h,v 1.17 2008/12/03 17:58:00 gilles Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -43,14 +43,19 @@
#define RCPTBUFSZ 256
+#define DIRHASH_BUCKETS 4096
+
#define PATH_SPOOL "/var/spool/smtpd"
-#define PATH_MESSAGES "/messages"
-#define PATH_LOCAL "/local"
-#define PATH_RELAY "/relay"
-#define PATH_DAEMON "/daemon"
+#define PATH_INCOMING "/incoming"
+#define PATH_QUEUE "/queue"
+#define PATH_MESSAGE "/message"
#define PATH_ENVELOPES "/envelopes"
+#define PATH_RUNQUEUE "/runqueue"
+#define PATH_RUNQUEUEHIGH "/runqueue-high"
+#define PATH_RUNQUEUELOW "/runqueue-low"
+
/* used by newaliases */
#define PATH_ALIASES "/etc/mail/aliases"
#define PATH_ALIASESDB "/etc/mail/aliases.db"
@@ -173,6 +178,12 @@ enum imsg_type {
IMSG_MFA_DATA_SUBMIT,
IMSG_MFA_LOOKUP_MAIL,
IMSG_MFA_LOOKUP_RCPT,
+
+ IMSG_QUEUE_CREATE_MESSAGE,
+ IMSG_QUEUE_SUBMIT_ENVELOPE,
+ IMSG_QUEUE_REMOVE_MESSAGE,
+ IMSG_QUEUE_COMMIT_MESSAGE,
+
IMSG_QUEUE_REMOVE_SUBMISSION,
IMSG_QUEUE_CREATE_MESSAGE_FILE,
IMSG_QUEUE_DELETE_MESSAGE_FILE,
@@ -183,6 +194,7 @@ enum imsg_type {
IMSG_QUEUE_BATCH_COMPLETE,
IMSG_QUEUE_BATCH_CLOSE,
IMSG_QUEUE_MESSAGE_FD,
+ IMSG_QUEUE_MESSAGE_FILE,
IMSG_QUEUE_ACCEPTED_CLOSE,
IMSG_QUEUE_RETRY_CLOSE,
@@ -195,6 +207,8 @@ enum imsg_type {
IMSG_BATCH_APPEND,
IMSG_BATCH_CLOSE,
+ IMSG_SMTP_MESSAGE_ID,
+
IMSG_SMTP_MESSAGE_FILE,
IMSG_SMTP_SUBMIT_ACK,
IMSG_SMTP_HOSTNAME_ANSWER,
@@ -414,11 +428,10 @@ enum message_status {
};
enum message_flags {
- F_MESSAGE_COMPLETE = 0x1,
- F_MESSAGE_RESOLVED = 0x2,
- F_MESSAGE_READY = 0x4,
- F_MESSAGE_EXPIRED = 0x8,
- F_MESSAGE_PROCESSING = 0x10
+ F_MESSAGE_RESOLVED = 0x1,
+ F_MESSAGE_EXPIRED = 0x2,
+ F_MESSAGE_SCHEDULED = 0x4,
+ F_MESSAGE_PROCESSING = 0x8
};
struct message {
@@ -609,6 +622,7 @@ struct smtpd {
u_int32_t sc_flags;
struct timeval sc_qintval;
struct event sc_ev;
+ struct event sc_rqev;
int sc_pipes[PROC_COUNT]
[PROC_COUNT][2];
struct imsgbuf *sc_ibufs[PROC_COUNT];