summaryrefslogtreecommitdiff
path: root/usr.sbin/smtpd/queue.c
diff options
context:
space:
mode:
authorGilles Chehade <gilles@cvs.openbsd.org>2008-12-05 02:51:33 +0000
committerGilles Chehade <gilles@cvs.openbsd.org>2008-12-05 02:51:33 +0000
commit747f62b5dbedca879d99609ee63547c8d6d68e49 (patch)
tree2425a8308ec1b78b4e6b95c3b4a061ef0daffd87 /usr.sbin/smtpd/queue.c
parent695bc4c79dcf8db0af4dc04498a5753d14ca39ad (diff)
- last part of the new queue code: the runner process (unprivileged and
chrooted) is now in charge of doing the scheduling of deliveries, and the dispatching of messages to MDA and MTA. queue process only does inserts/updates/removals from the queue and can no longer be so busy that it delays answers to imsg from smtp server.
Diffstat (limited to 'usr.sbin/smtpd/queue.c')
-rw-r--r--usr.sbin/smtpd/queue.c604
1 files changed, 50 insertions, 554 deletions
diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c
index 6d67033a1be..f3e4fc4422c 100644
--- a/usr.sbin/smtpd/queue.c
+++ b/usr.sbin/smtpd/queue.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: queue.c,v 1.18 2008/12/03 20:08:08 gilles Exp $ */
+/* $OpenBSD: queue.c,v 1.19 2008/12/05 02:51:32 gilles Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -51,10 +51,10 @@ void queue_dispatch_smtp(int, short, void *);
void queue_dispatch_mda(int, short, void *);
void queue_dispatch_mta(int, short, void *);
void queue_dispatch_lka(int, short, void *);
+void queue_dispatch_runner(int, short, void *);
void queue_setup_events(struct smtpd *);
void queue_disable_events(struct smtpd *);
void queue_timeout(int, short, void *);
-void queue_process_runqueue(int, short, void *);
int queue_create_incoming_layout(char *);
int queue_record_envelope(struct message *);
int queue_remove_envelope(struct message *);
@@ -81,7 +81,6 @@ void batch_send(struct smtpd *, struct batch *, time_t);
u_int32_t hash(u_int8_t *, size_t);
struct batch *queue_record_batch(struct smtpd *, struct message *);
struct batch *batch_by_id(struct smtpd *, u_int64_t);
-struct batch *batch_lookup(struct smtpd *, struct message *);
struct message *message_by_id(struct smtpd *, struct batch *, u_int64_t);
void
@@ -541,12 +540,54 @@ queue_dispatch_lka(int sig, short event, void *p)
break;
}
- case IMSG_LKA_MX_LOOKUP: {
- queue_batch_resolved(env, imsg.data);
+ default:
+ log_debug("queue_dispatch_lka: unexpected imsg %d",
+ imsg.hdr.type);
break;
}
+ imsg_free(&imsg);
+ }
+ imsg_event_add(ibuf);
+}
+
+void
+queue_dispatch_runner(int sig, short event, void *p)
+{
+ struct smtpd *env = p;
+ struct imsgbuf *ibuf;
+ struct imsg imsg;
+ ssize_t n;
+
+ ibuf = env->sc_ibufs[PROC_RUNNER];
+ switch (event) {
+ case EV_READ:
+ if ((n = imsg_read(ibuf)) == -1)
+ fatal("imsg_read_error");
+ if (n == 0) {
+ /* this pipe is dead, so remove the event handler */
+ event_del(&ibuf->ev);
+ event_loopexit(NULL);
+ return;
+ }
+ break;
+ case EV_WRITE:
+ if (msgbuf_write(&ibuf->w) == -1)
+ fatal("msgbuf_write");
+ imsg_event_add(ibuf);
+ return;
+ default:
+ fatalx("unknown event");
+ }
+
+ for (;;) {
+ if ((n = imsg_get(ibuf, &imsg)) == -1)
+ fatal("queue_dispatch_runner: imsg_read error");
+ if (n == 0)
+ break;
+
+ switch (imsg.hdr.type) {
default:
- log_debug("queue_dispatch_lka: unexpected imsg %d",
+ log_debug("queue_dispatch_runner: unexpected imsg %d",
imsg.hdr.type);
break;
}
@@ -565,57 +606,11 @@ queue_shutdown(void)
void
queue_setup_events(struct smtpd *env)
{
- struct timeval tv;
-
- evtimer_set(&env->sc_ev, queue_timeout, env);
- tv.tv_sec = 0;
- tv.tv_usec = 10;
- evtimer_add(&env->sc_ev, &tv);
-
- evtimer_set(&env->sc_rqev, queue_process_runqueue, env);
- tv.tv_sec = 0;
- tv.tv_usec = 10;
- evtimer_add(&env->sc_rqev, &tv);
}
void
queue_disable_events(struct smtpd *env)
{
- evtimer_del(&env->sc_ev);
-}
-
-void
-queue_timeout(int fd, short event, void *p)
-{
- struct smtpd *env = p;
- struct timeval tv;
- time_t curtime;
- struct batch *batchp, *nxt;
-
- queue_process(env);
-
- curtime = time(NULL);
-
- for (batchp = SPLAY_MIN(batchtree, &env->batch_queue);
- batchp != NULL;
- batchp = nxt) {
- nxt = SPLAY_NEXT(batchtree, &env->batch_queue, batchp);
- if ((batchp->type & T_MTA_BATCH) &&
- (batchp->flags & F_BATCH_RESOLVED) == 0) {
- continue;
- }
-
- batch_send(env, batchp, curtime);
-
- SPLAY_REMOVE(batchtree, &env->batch_queue, batchp);
- bzero(batchp, sizeof(struct batch));
- free(batchp);
-
- }
-
- tv.tv_sec = 0;
- tv.tv_usec = 10;
- evtimer_add(&env->sc_ev, &tv);
}
pid_t
@@ -632,7 +627,8 @@ queue(struct smtpd *env)
{ PROC_SMTP, queue_dispatch_smtp },
{ PROC_MDA, queue_dispatch_mda },
{ PROC_MTA, queue_dispatch_mta },
- { PROC_LKA, queue_dispatch_lka }
+ { PROC_LKA, queue_dispatch_lka },
+ { PROC_RUNNER, queue_dispatch_runner }
};
switch (pid = fork()) {
@@ -667,8 +663,6 @@ queue(struct smtpd *env)
fatal("queue: cannot drop privileges");
#endif
- SPLAY_INIT(&env->batch_queue);
-
event_init();
signal_set(&ev_sigint, SIGINT, queue_sig_handler, env);
@@ -678,7 +672,7 @@ queue(struct smtpd *env)
signal(SIGPIPE, SIG_IGN);
signal(SIGHUP, SIG_IGN);
- config_peers(env, peers, 5);
+ config_peers(env, peers, 6);
queue_setup_events(env);
event_dispatch();
@@ -687,257 +681,6 @@ queue(struct smtpd *env)
return (0);
}
-void
-queue_process(struct smtpd *env)
-{
- u_int16_t cbucket = 0;
- static u_int16_t lbucket = 0;
- DIR *dirp;
- struct dirent *dp;
- const char *errstr;
- static u_int8_t bucketdone = 1;
-
- if (! bucketdone) {
- bucketdone = queue_process_bucket(env, lbucket);
- if (bucketdone)
- lbucket = (lbucket + 1) % DIRHASH_BUCKETS;
- return;
- }
-
- dirp = opendir(PATH_QUEUE);
- if (dirp == NULL)
- fatal("queue_process: opendir");
-
- while ((dp = readdir(dirp)) != NULL) {
-
- if (strcmp(dp->d_name, ".") == 0 ||
- strcmp(dp->d_name, "..") == 0)
- continue;
-
- cbucket = strtonum(dp->d_name, 0, DIRHASH_BUCKETS - 1, &errstr);
- if (errstr) {
- log_warn("queue_process: %s/%s is not a valid bucket",
- PATH_QUEUE, dp->d_name);
- continue;
- }
-
- if (cbucket == lbucket)
- break;
- }
- closedir(dirp);
-
- if (dp == NULL) {
- lbucket = (lbucket + 1) % DIRHASH_BUCKETS;
- return;
- }
-
- bucketdone = queue_process_bucket(env, cbucket);
- if (bucketdone)
- lbucket = (lbucket + 1) % DIRHASH_BUCKETS;
-}
-
-int
-queue_process_bucket(struct smtpd *env, u_int16_t bucket)
-{
- int spret;
- static DIR *dirp = NULL;
- struct dirent *dp;
- static char *msgid = NULL;
- char bucketpath[MAXPATHLEN];
- static u_int8_t messagedone = 1;
-
- if (! messagedone) {
- messagedone = queue_process_message(env, msgid);
- if (! messagedone)
- return 0;
- msgid = NULL;
- }
-
- spret = snprintf(bucketpath, MAXPATHLEN, "%s/%d", PATH_QUEUE, bucket);
- if (spret == -1 || spret >= MAXPATHLEN)
- fatal("queue_process_bucket: snprintf");
-
- if (dirp == NULL) {
- dirp = opendir(bucketpath);
- if (dirp == NULL)
- fatal("queue_process_bucket: opendir");
- }
-
- while ((dp = readdir(dirp)) != NULL) {
-
- if (strcmp(dp->d_name, ".") == 0 ||
- strcmp(dp->d_name, "..") == 0)
- continue;
-
- break;
- }
-
- if (dp != NULL) {
- msgid = dp->d_name;
- messagedone = queue_process_message(env, msgid);
- if (! messagedone)
- return 0;
- msgid = NULL;
- }
-
- closedir(dirp);
- dirp = NULL;
- return 1;
-}
-
-int
-queue_process_message(struct smtpd *env, char *messageid)
-{
- int spret;
- static DIR *dirp = NULL;
- struct dirent *dp;
- char evppath[MAXPATHLEN];
- u_int16_t hval = 0;
-
- hval = hash(messageid, strlen(messageid)) % DIRHASH_BUCKETS;
-
- spret = snprintf(evppath, MAXPATHLEN, "%s/%d/%s%s", PATH_QUEUE, hval,
- messageid, PATH_ENVELOPES);
- if (spret == -1 || spret >= MAXPATHLEN)
- fatal("queue_process_message: snprintf");
-
- if (dirp == NULL) {
- dirp = opendir(evppath);
- if (dirp == NULL)
- fatal("queue_process_message: opendir");
- }
-
- while ((dp = readdir(dirp)) != NULL) {
-
- if (strcmp(dp->d_name, ".") == 0 ||
- strcmp(dp->d_name, "..") == 0)
- continue;
- break;
- }
-
- if (dp != NULL) {
- queue_process_envelope(env, messageid, dp->d_name);
- return 0;
- }
-
- closedir(dirp);
- dirp = NULL;
- return 1;
-}
-
-void
-queue_process_envelope(struct smtpd *env, char *msgid, char *evpid)
-{
- int spret;
- struct message message;
- time_t tm;
- char evppath[MAXPATHLEN];
- char rqpath[MAXPATHLEN];
- u_int16_t hval;
- struct stat sb;
-
- if (! queue_load_envelope(&message, evpid)) {
- log_debug("failed to load envelope: %s", evpid);
- return;
- }
-
- tm = time(NULL);
-
- if (! queue_message_schedule(&message, tm)) {
- if (message.flags & F_MESSAGE_EXPIRED) {
- log_debug("message has expired, mdaemon");
- queue_remove_envelope(&message);
- }
- return;
- }
-
- message.flags |= F_MESSAGE_SCHEDULED;
- queue_update_envelope(&message);
-
- log_debug("SCHEDULED: %s", evpid);
- hval = hash(msgid, strlen(msgid)) % DIRHASH_BUCKETS;
- spret = snprintf(evppath, MAXPATHLEN, "%s/%d/%s%s/%s", PATH_QUEUE, hval,
- msgid, PATH_ENVELOPES, evpid);
- if (spret == -1 || spret >= MAXPATHLEN)
- fatal("queue_process_envelope: snprintf");
-
- spret = snprintf(rqpath, MAXPATHLEN, "%s/%s", PATH_RUNQUEUE, evpid);
- if (spret == -1 || spret >= MAXPATHLEN)
- fatal("queue_process_envelope: snprintf");
-
- if (stat(rqpath, &sb) == -1) {
- if (errno != ENOENT)
- fatal("queue_process_envelope: stat");
-
- if (symlink(evppath, rqpath) == -1) {
- log_info("queue_process_envelope: "
- "failed to place envelope in runqueue");
- }
- }
-}
-
-void
-queue_process_runqueue(int fd, short event, void *p)
-{
- DIR *dirp;
- struct dirent *dp;
- struct message message;
- struct message *messagep;
- struct batch *batchp;
- char pathname[MAXPATHLEN];
- time_t tm;
- struct smtpd *env = p;
- struct timeval tv;
-
- tm = time(NULL);
-
- dirp = opendir(PATH_RUNQUEUE);
- if (dirp == NULL)
- fatal("queue_process_runqueue: opendir");
-
- while ((dp = readdir(dirp)) != NULL) {
- if (strcmp(dp->d_name, ".") == 0 ||
- strcmp(dp->d_name, "..") == 0)
- continue;
-
- /* XXX */
- snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_RUNQUEUE, dp->d_name);
- unlink(pathname);
-
- if (! queue_load_envelope(&message, dp->d_name)) {
- log_debug("failed to load envelope");
- continue;
- }
-
- if (message.flags & F_MESSAGE_PROCESSING)
- continue;
-
- message.lasttry = tm;
- message.flags &= ~F_MESSAGE_SCHEDULED;
- message.flags |= F_MESSAGE_PROCESSING;
- queue_update_envelope(&message);
-
- messagep = calloc(1, sizeof (struct message));
- if (messagep == NULL)
- err(1, "calloc");
- *messagep = message;
-
- batchp = batch_lookup(env, messagep);
- if (batchp != NULL)
- messagep->batch_id = batchp->id;
-
- batchp = queue_record_batch(env, messagep);
- if (messagep->batch_id == 0)
- messagep->batch_id = batchp->id;
- }
-
- closedir(dirp);
-
- tv.tv_sec = 0;
- tv.tv_usec = 10;
- evtimer_add(&env->sc_rqev, &tv);
-}
-
u_int64_t
queue_generate_id(void)
{
@@ -955,62 +698,6 @@ queue_generate_id(void)
return (id);
}
-struct batch *
-queue_record_batch(struct smtpd *env, struct message *messagep)
-{
- struct batch *batchp;
- struct path *path;
-
- batchp = NULL;
- if (messagep->batch_id != 0) {
- batchp = batch_by_id(env, messagep->batch_id);
- if (batchp == NULL)
- errx(1, "%s: internal inconsistency.", __func__);
- }
-
- if (batchp == NULL) {
- batchp = calloc(1, sizeof(struct batch));
- if (batchp == NULL)
- err(1, "%s: calloc", __func__);
-
- batchp->id = queue_generate_id();
- batchp->creation = messagep->creation;
-
- (void)strlcpy(batchp->message_id, messagep->message_id,
- sizeof(batchp->message_id));
-
- TAILQ_INIT(&batchp->messages);
- SPLAY_INSERT(batchtree, &env->batch_queue, batchp);
-
- if (messagep->type & T_DAEMON_MESSAGE) {
- batchp->type = T_DAEMON_BATCH;
- path = &messagep->sender;
- }
- else {
- path = &messagep->recipient;
- }
-
- batchp->rule = path->rule;
-
- (void)strlcpy(batchp->hostname, path->domain,
- sizeof(batchp->hostname));
-
- if (IS_MAILBOX(path->rule.r_action) ||
- IS_EXT(path->rule.r_action)) {
- batchp->type |= T_MDA_BATCH;
- }
- else {
- batchp->type |= T_MTA_BATCH;
- imsg_compose(env->sc_ibufs[PROC_LKA], IMSG_LKA_MX_LOOKUP, 0, 0, -1,
- batchp, sizeof(struct batch));
- }
- }
-
- TAILQ_INSERT_TAIL(&batchp->messages, messagep, entry);
-
- return batchp;
-}
-
int
queue_remove_batch_message(struct smtpd *env, struct batch *batchp, struct message *messagep)
{
@@ -1028,195 +715,6 @@ queue_remove_batch_message(struct smtpd *env, struct batch *batchp, struct messa
return 0;
}
-int
-queue_batch_resolved(struct smtpd *env, struct batch *lookup)
-{
- u_int32_t i;
- struct batch *batchp;
-
- batchp = batch_by_id(env, lookup->id);
- batchp->getaddrinfo_error = lookup->getaddrinfo_error;
- batchp->mx_cnt = lookup->mx_cnt;
-
-/*
- EAI_NODATA no address associated with hostname
- EAI_NONAME hostname or servname not provided, or not known
- EAI_PROTOCOL resolved protocol is unknown
- EAI_SERVICE servname not supported for ai_socktype
- EAI_SOCKTYPE ai_socktype not supported
- EAI_SYSTEM system error returned in errno
-
-
- */
-
- switch (batchp->getaddrinfo_error) {
- case EAI_ADDRFAMILY:
- case EAI_BADFLAGS:
- case EAI_BADHINTS:
- case EAI_FAIL:
- case EAI_FAMILY:
- case EAI_NODATA:
- case EAI_NONAME:
- case EAI_SERVICE:
- case EAI_SOCKTYPE:
- case EAI_SYSTEM:
- /* XXX */
- /*
- * In the case of a DNS permanent error, do not generate a
- * daemon message if the error originates from one already
- * as this would cause a loop. Remove the initial batch as
- * it will never succeed.
- *
- */
- return 0;
-
- case EAI_AGAIN:
- case EAI_MEMORY:
- /* XXX */
- /*
- * Do not generate a daemon message if this error happened
- * while processing a daemon message. Do NOT remove batch,
- * it may succeed later.
- */
- return 0;
-
- default:
- batchp->flags |= F_BATCH_RESOLVED;
- for (i = 0; i < batchp->mx_cnt; ++i)
- batchp->mxarray[i].ss = lookup->mxarray[i].ss;
- }
- return 1;
-}
-
-struct batch *
-batch_lookup(struct smtpd *env, struct message *message)
-{
- struct batch *batchp;
- struct batch lookup;
-
- /* If message->batch_id != 0, we can retrieve batch by id */
- if (message->batch_id != 0) {
- lookup.id = message->batch_id;
- return SPLAY_FIND(batchtree, &env->batch_queue, &lookup);
- }
-
- /* We do not know the batch_id yet, maybe it was created but we could not
- * be notified, or it just does not exist. Let's scan to see if we can do
- * a match based on our message_id and flags.
- */
- SPLAY_FOREACH(batchp, batchtree, &env->batch_queue) {
-
- if (batchp->type != message->type)
- continue;
-
- if (strcasecmp(batchp->message_id, message->message_id) != 0)
- continue;
-
- if (batchp->type & T_MTA_BATCH)
- if (strcasecmp(batchp->hostname, message->recipient.domain) != 0)
- continue;
-
- break;
- }
-
- return batchp;
-}
-
-int
-batch_cmp(struct batch *s1, struct batch *s2)
-{
- /*
- * do not return u_int64_t's
- */
- if (s1->id < s2->id)
- return (-1);
-
- if (s1->id > s2->id)
- return (1);
-
- return (0);
-}
-
-int
-queue_message_schedule(struct message *messagep, time_t tm)
-{
- time_t delay;
-
- /* Batch has been in the queue for too long and expired */
- if (tm - messagep->creation >= SMTPD_QUEUE_EXPIRY) {
- messagep->flags |= F_MESSAGE_EXPIRED;
- return 0;
- }
-
- if (messagep->retry == 255) {
- messagep->flags |= F_MESSAGE_EXPIRED;
- return 0;
- }
-
- if ((messagep->flags & F_MESSAGE_SCHEDULED) != 0)
- return 0;
-
- if ((messagep->flags & F_MESSAGE_PROCESSING) != 0)
- return 0;
-
- if (messagep->lasttry == 0)
- return 1;
-
- delay = SMTPD_QUEUE_MAXINTERVAL;
-
- if (messagep->type & T_MDA_MESSAGE) {
- if (messagep->retry < 5)
- return 1;
-
- if (messagep->retry < 15)
- delay = (messagep->retry * 60) + arc4random() % 60;
- }
-
- if (messagep->type & T_MTA_MESSAGE) {
- if (messagep->retry < 3)
- delay = SMTPD_QUEUE_INTERVAL;
- else if (messagep->retry <= 7) {
- delay = SMTPD_QUEUE_INTERVAL * (1 << (messagep->retry - 3));
- if (delay > SMTPD_QUEUE_MAXINTERVAL)
- delay = SMTPD_QUEUE_MAXINTERVAL;
- }
- }
-
- if (tm >= messagep->lasttry + delay)
- return 1;
-
- return 0;
-}
-
-void
-batch_send(struct smtpd *env, struct batch *batchp, time_t curtime)
-{
- u_int8_t proctype;
- struct message *messagep;
-
- if ((batchp->type & (T_MDA_BATCH|T_MTA_BATCH)) == 0)
- fatal("batch_send: unknown batch type");
-
- if (batchp->type & T_MDA_BATCH)
- proctype = PROC_MDA;
- else if (batchp->type & T_MTA_BATCH)
- proctype = PROC_MTA;
-
- imsg_compose(env->sc_ibufs[proctype], IMSG_CREATE_BATCH, 0, 0, -1,
- batchp, sizeof (struct batch));
-
- while ((messagep = TAILQ_FIRST(&batchp->messages))) {
- imsg_compose(env->sc_ibufs[proctype], IMSG_BATCH_APPEND, 0, 0,
- -1, messagep, sizeof (struct message));
- TAILQ_REMOVE(&batchp->messages, messagep, entry);
- bzero(messagep, sizeof(struct message));
- free(messagep);
- }
-
- imsg_compose(env->sc_ibufs[proctype], IMSG_BATCH_CLOSE, 0, 0, -1,
- batchp, sizeof(struct batch));
-}
-
struct batch *
batch_by_id(struct smtpd *env, u_int64_t id)
{
@@ -1763,5 +1261,3 @@ hash(u_int8_t *buf, size_t len)
return h;
}
-
-SPLAY_GENERATE(batchtree, batch, b_nodes, batch_cmp);