diff options
author | Gilles Chehade <gilles@cvs.openbsd.org> | 2008-12-05 02:51:33 +0000 |
---|---|---|
committer | Gilles Chehade <gilles@cvs.openbsd.org> | 2008-12-05 02:51:33 +0000 |
commit | 747f62b5dbedca879d99609ee63547c8d6d68e49 (patch) | |
tree | 2425a8308ec1b78b4e6b95c3b4a061ef0daffd87 /usr.sbin/smtpd/queue.c | |
parent | 695bc4c79dcf8db0af4dc04498a5753d14ca39ad (diff) |
- last part of the new queue code: the runner process (unprivileged and
chrooted) is now in charge of doing the scheduling of deliveries,
and the dispatching of messages to MDA and MTA. queue process only
does inserts/updates/removals from the queue and can no longer be
so busy that it delays answers to imsg from smtp server.
Diffstat (limited to 'usr.sbin/smtpd/queue.c')
-rw-r--r-- | usr.sbin/smtpd/queue.c | 604 |
1 files changed, 50 insertions, 554 deletions
diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c index 6d67033a1be..f3e4fc4422c 100644 --- a/usr.sbin/smtpd/queue.c +++ b/usr.sbin/smtpd/queue.c @@ -1,4 +1,4 @@ -/* $OpenBSD: queue.c,v 1.18 2008/12/03 20:08:08 gilles Exp $ */ +/* $OpenBSD: queue.c,v 1.19 2008/12/05 02:51:32 gilles Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> @@ -51,10 +51,10 @@ void queue_dispatch_smtp(int, short, void *); void queue_dispatch_mda(int, short, void *); void queue_dispatch_mta(int, short, void *); void queue_dispatch_lka(int, short, void *); +void queue_dispatch_runner(int, short, void *); void queue_setup_events(struct smtpd *); void queue_disable_events(struct smtpd *); void queue_timeout(int, short, void *); -void queue_process_runqueue(int, short, void *); int queue_create_incoming_layout(char *); int queue_record_envelope(struct message *); int queue_remove_envelope(struct message *); @@ -81,7 +81,6 @@ void batch_send(struct smtpd *, struct batch *, time_t); u_int32_t hash(u_int8_t *, size_t); struct batch *queue_record_batch(struct smtpd *, struct message *); struct batch *batch_by_id(struct smtpd *, u_int64_t); -struct batch *batch_lookup(struct smtpd *, struct message *); struct message *message_by_id(struct smtpd *, struct batch *, u_int64_t); void @@ -541,12 +540,54 @@ queue_dispatch_lka(int sig, short event, void *p) break; } - case IMSG_LKA_MX_LOOKUP: { - queue_batch_resolved(env, imsg.data); + default: + log_debug("queue_dispatch_lka: unexpected imsg %d", + imsg.hdr.type); break; } + imsg_free(&imsg); + } + imsg_event_add(ibuf); +} + +void +queue_dispatch_runner(int sig, short event, void *p) +{ + struct smtpd *env = p; + struct imsgbuf *ibuf; + struct imsg imsg; + ssize_t n; + + ibuf = env->sc_ibufs[PROC_RUNNER]; + switch (event) { + case EV_READ: + if ((n = imsg_read(ibuf)) == -1) + fatal("imsg_read_error"); + if (n == 0) { + /* this pipe is dead, so remove the event handler */ + event_del(&ibuf->ev); + event_loopexit(NULL); + return; + } + break; + case EV_WRITE: + if (msgbuf_write(&ibuf->w) == -1) + fatal("msgbuf_write"); + imsg_event_add(ibuf); + return; + default: + fatalx("unknown event"); + } + + for (;;) { + if ((n = imsg_get(ibuf, &imsg)) == -1) + fatal("queue_dispatch_runner: imsg_read error"); + if (n == 0) + break; + + switch (imsg.hdr.type) { default: - log_debug("queue_dispatch_lka: unexpected imsg %d", + log_debug("queue_dispatch_runner: unexpected imsg %d", imsg.hdr.type); break; } @@ -565,57 +606,11 @@ queue_shutdown(void) void queue_setup_events(struct smtpd *env) { - struct timeval tv; - - evtimer_set(&env->sc_ev, queue_timeout, env); - tv.tv_sec = 0; - tv.tv_usec = 10; - evtimer_add(&env->sc_ev, &tv); - - evtimer_set(&env->sc_rqev, queue_process_runqueue, env); - tv.tv_sec = 0; - tv.tv_usec = 10; - evtimer_add(&env->sc_rqev, &tv); } void queue_disable_events(struct smtpd *env) { - evtimer_del(&env->sc_ev); -} - -void -queue_timeout(int fd, short event, void *p) -{ - struct smtpd *env = p; - struct timeval tv; - time_t curtime; - struct batch *batchp, *nxt; - - queue_process(env); - - curtime = time(NULL); - - for (batchp = SPLAY_MIN(batchtree, &env->batch_queue); - batchp != NULL; - batchp = nxt) { - nxt = SPLAY_NEXT(batchtree, &env->batch_queue, batchp); - if ((batchp->type & T_MTA_BATCH) && - (batchp->flags & F_BATCH_RESOLVED) == 0) { - continue; - } - - batch_send(env, batchp, curtime); - - SPLAY_REMOVE(batchtree, &env->batch_queue, batchp); - bzero(batchp, sizeof(struct batch)); - free(batchp); - - } - - tv.tv_sec = 0; - tv.tv_usec = 10; - evtimer_add(&env->sc_ev, &tv); } pid_t @@ -632,7 +627,8 @@ queue(struct smtpd *env) { PROC_SMTP, queue_dispatch_smtp }, { PROC_MDA, queue_dispatch_mda }, { PROC_MTA, queue_dispatch_mta }, - { PROC_LKA, queue_dispatch_lka } + { PROC_LKA, queue_dispatch_lka }, + { PROC_RUNNER, queue_dispatch_runner } }; switch (pid = fork()) { @@ -667,8 +663,6 @@ queue(struct smtpd *env) fatal("queue: cannot drop privileges"); #endif - SPLAY_INIT(&env->batch_queue); - event_init(); signal_set(&ev_sigint, SIGINT, queue_sig_handler, env); @@ -678,7 +672,7 @@ queue(struct smtpd *env) signal(SIGPIPE, SIG_IGN); signal(SIGHUP, SIG_IGN); - config_peers(env, peers, 5); + config_peers(env, peers, 6); queue_setup_events(env); event_dispatch(); @@ -687,257 +681,6 @@ queue(struct smtpd *env) return (0); } -void -queue_process(struct smtpd *env) -{ - u_int16_t cbucket = 0; - static u_int16_t lbucket = 0; - DIR *dirp; - struct dirent *dp; - const char *errstr; - static u_int8_t bucketdone = 1; - - if (! bucketdone) { - bucketdone = queue_process_bucket(env, lbucket); - if (bucketdone) - lbucket = (lbucket + 1) % DIRHASH_BUCKETS; - return; - } - - dirp = opendir(PATH_QUEUE); - if (dirp == NULL) - fatal("queue_process: opendir"); - - while ((dp = readdir(dirp)) != NULL) { - - if (strcmp(dp->d_name, ".") == 0 || - strcmp(dp->d_name, "..") == 0) - continue; - - cbucket = strtonum(dp->d_name, 0, DIRHASH_BUCKETS - 1, &errstr); - if (errstr) { - log_warn("queue_process: %s/%s is not a valid bucket", - PATH_QUEUE, dp->d_name); - continue; - } - - if (cbucket == lbucket) - break; - } - closedir(dirp); - - if (dp == NULL) { - lbucket = (lbucket + 1) % DIRHASH_BUCKETS; - return; - } - - bucketdone = queue_process_bucket(env, cbucket); - if (bucketdone) - lbucket = (lbucket + 1) % DIRHASH_BUCKETS; -} - -int -queue_process_bucket(struct smtpd *env, u_int16_t bucket) -{ - int spret; - static DIR *dirp = NULL; - struct dirent *dp; - static char *msgid = NULL; - char bucketpath[MAXPATHLEN]; - static u_int8_t messagedone = 1; - - if (! messagedone) { - messagedone = queue_process_message(env, msgid); - if (! messagedone) - return 0; - msgid = NULL; - } - - spret = snprintf(bucketpath, MAXPATHLEN, "%s/%d", PATH_QUEUE, bucket); - if (spret == -1 || spret >= MAXPATHLEN) - fatal("queue_process_bucket: snprintf"); - - if (dirp == NULL) { - dirp = opendir(bucketpath); - if (dirp == NULL) - fatal("queue_process_bucket: opendir"); - } - - while ((dp = readdir(dirp)) != NULL) { - - if (strcmp(dp->d_name, ".") == 0 || - strcmp(dp->d_name, "..") == 0) - continue; - - break; - } - - if (dp != NULL) { - msgid = dp->d_name; - messagedone = queue_process_message(env, msgid); - if (! messagedone) - return 0; - msgid = NULL; - } - - closedir(dirp); - dirp = NULL; - return 1; -} - -int -queue_process_message(struct smtpd *env, char *messageid) -{ - int spret; - static DIR *dirp = NULL; - struct dirent *dp; - char evppath[MAXPATHLEN]; - u_int16_t hval = 0; - - hval = hash(messageid, strlen(messageid)) % DIRHASH_BUCKETS; - - spret = snprintf(evppath, MAXPATHLEN, "%s/%d/%s%s", PATH_QUEUE, hval, - messageid, PATH_ENVELOPES); - if (spret == -1 || spret >= MAXPATHLEN) - fatal("queue_process_message: snprintf"); - - if (dirp == NULL) { - dirp = opendir(evppath); - if (dirp == NULL) - fatal("queue_process_message: opendir"); - } - - while ((dp = readdir(dirp)) != NULL) { - - if (strcmp(dp->d_name, ".") == 0 || - strcmp(dp->d_name, "..") == 0) - continue; - break; - } - - if (dp != NULL) { - queue_process_envelope(env, messageid, dp->d_name); - return 0; - } - - closedir(dirp); - dirp = NULL; - return 1; -} - -void -queue_process_envelope(struct smtpd *env, char *msgid, char *evpid) -{ - int spret; - struct message message; - time_t tm; - char evppath[MAXPATHLEN]; - char rqpath[MAXPATHLEN]; - u_int16_t hval; - struct stat sb; - - if (! queue_load_envelope(&message, evpid)) { - log_debug("failed to load envelope: %s", evpid); - return; - } - - tm = time(NULL); - - if (! queue_message_schedule(&message, tm)) { - if (message.flags & F_MESSAGE_EXPIRED) { - log_debug("message has expired, mdaemon"); - queue_remove_envelope(&message); - } - return; - } - - message.flags |= F_MESSAGE_SCHEDULED; - queue_update_envelope(&message); - - log_debug("SCHEDULED: %s", evpid); - hval = hash(msgid, strlen(msgid)) % DIRHASH_BUCKETS; - spret = snprintf(evppath, MAXPATHLEN, "%s/%d/%s%s/%s", PATH_QUEUE, hval, - msgid, PATH_ENVELOPES, evpid); - if (spret == -1 || spret >= MAXPATHLEN) - fatal("queue_process_envelope: snprintf"); - - spret = snprintf(rqpath, MAXPATHLEN, "%s/%s", PATH_RUNQUEUE, evpid); - if (spret == -1 || spret >= MAXPATHLEN) - fatal("queue_process_envelope: snprintf"); - - if (stat(rqpath, &sb) == -1) { - if (errno != ENOENT) - fatal("queue_process_envelope: stat"); - - if (symlink(evppath, rqpath) == -1) { - log_info("queue_process_envelope: " - "failed to place envelope in runqueue"); - } - } -} - -void -queue_process_runqueue(int fd, short event, void *p) -{ - DIR *dirp; - struct dirent *dp; - struct message message; - struct message *messagep; - struct batch *batchp; - char pathname[MAXPATHLEN]; - time_t tm; - struct smtpd *env = p; - struct timeval tv; - - tm = time(NULL); - - dirp = opendir(PATH_RUNQUEUE); - if (dirp == NULL) - fatal("queue_process_runqueue: opendir"); - - while ((dp = readdir(dirp)) != NULL) { - if (strcmp(dp->d_name, ".") == 0 || - strcmp(dp->d_name, "..") == 0) - continue; - - /* XXX */ - snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_RUNQUEUE, dp->d_name); - unlink(pathname); - - if (! queue_load_envelope(&message, dp->d_name)) { - log_debug("failed to load envelope"); - continue; - } - - if (message.flags & F_MESSAGE_PROCESSING) - continue; - - message.lasttry = tm; - message.flags &= ~F_MESSAGE_SCHEDULED; - message.flags |= F_MESSAGE_PROCESSING; - queue_update_envelope(&message); - - messagep = calloc(1, sizeof (struct message)); - if (messagep == NULL) - err(1, "calloc"); - *messagep = message; - - batchp = batch_lookup(env, messagep); - if (batchp != NULL) - messagep->batch_id = batchp->id; - - batchp = queue_record_batch(env, messagep); - if (messagep->batch_id == 0) - messagep->batch_id = batchp->id; - } - - closedir(dirp); - - tv.tv_sec = 0; - tv.tv_usec = 10; - evtimer_add(&env->sc_rqev, &tv); -} - u_int64_t queue_generate_id(void) { @@ -955,62 +698,6 @@ queue_generate_id(void) return (id); } -struct batch * -queue_record_batch(struct smtpd *env, struct message *messagep) -{ - struct batch *batchp; - struct path *path; - - batchp = NULL; - if (messagep->batch_id != 0) { - batchp = batch_by_id(env, messagep->batch_id); - if (batchp == NULL) - errx(1, "%s: internal inconsistency.", __func__); - } - - if (batchp == NULL) { - batchp = calloc(1, sizeof(struct batch)); - if (batchp == NULL) - err(1, "%s: calloc", __func__); - - batchp->id = queue_generate_id(); - batchp->creation = messagep->creation; - - (void)strlcpy(batchp->message_id, messagep->message_id, - sizeof(batchp->message_id)); - - TAILQ_INIT(&batchp->messages); - SPLAY_INSERT(batchtree, &env->batch_queue, batchp); - - if (messagep->type & T_DAEMON_MESSAGE) { - batchp->type = T_DAEMON_BATCH; - path = &messagep->sender; - } - else { - path = &messagep->recipient; - } - - batchp->rule = path->rule; - - (void)strlcpy(batchp->hostname, path->domain, - sizeof(batchp->hostname)); - - if (IS_MAILBOX(path->rule.r_action) || - IS_EXT(path->rule.r_action)) { - batchp->type |= T_MDA_BATCH; - } - else { - batchp->type |= T_MTA_BATCH; - imsg_compose(env->sc_ibufs[PROC_LKA], IMSG_LKA_MX_LOOKUP, 0, 0, -1, - batchp, sizeof(struct batch)); - } - } - - TAILQ_INSERT_TAIL(&batchp->messages, messagep, entry); - - return batchp; -} - int queue_remove_batch_message(struct smtpd *env, struct batch *batchp, struct message *messagep) { @@ -1028,195 +715,6 @@ queue_remove_batch_message(struct smtpd *env, struct batch *batchp, struct messa return 0; } -int -queue_batch_resolved(struct smtpd *env, struct batch *lookup) -{ - u_int32_t i; - struct batch *batchp; - - batchp = batch_by_id(env, lookup->id); - batchp->getaddrinfo_error = lookup->getaddrinfo_error; - batchp->mx_cnt = lookup->mx_cnt; - -/* - EAI_NODATA no address associated with hostname - EAI_NONAME hostname or servname not provided, or not known - EAI_PROTOCOL resolved protocol is unknown - EAI_SERVICE servname not supported for ai_socktype - EAI_SOCKTYPE ai_socktype not supported - EAI_SYSTEM system error returned in errno - - - */ - - switch (batchp->getaddrinfo_error) { - case EAI_ADDRFAMILY: - case EAI_BADFLAGS: - case EAI_BADHINTS: - case EAI_FAIL: - case EAI_FAMILY: - case EAI_NODATA: - case EAI_NONAME: - case EAI_SERVICE: - case EAI_SOCKTYPE: - case EAI_SYSTEM: - /* XXX */ - /* - * In the case of a DNS permanent error, do not generate a - * daemon message if the error originates from one already - * as this would cause a loop. Remove the initial batch as - * it will never succeed. - * - */ - return 0; - - case EAI_AGAIN: - case EAI_MEMORY: - /* XXX */ - /* - * Do not generate a daemon message if this error happened - * while processing a daemon message. Do NOT remove batch, - * it may succeed later. - */ - return 0; - - default: - batchp->flags |= F_BATCH_RESOLVED; - for (i = 0; i < batchp->mx_cnt; ++i) - batchp->mxarray[i].ss = lookup->mxarray[i].ss; - } - return 1; -} - -struct batch * -batch_lookup(struct smtpd *env, struct message *message) -{ - struct batch *batchp; - struct batch lookup; - - /* If message->batch_id != 0, we can retrieve batch by id */ - if (message->batch_id != 0) { - lookup.id = message->batch_id; - return SPLAY_FIND(batchtree, &env->batch_queue, &lookup); - } - - /* We do not know the batch_id yet, maybe it was created but we could not - * be notified, or it just does not exist. Let's scan to see if we can do - * a match based on our message_id and flags. - */ - SPLAY_FOREACH(batchp, batchtree, &env->batch_queue) { - - if (batchp->type != message->type) - continue; - - if (strcasecmp(batchp->message_id, message->message_id) != 0) - continue; - - if (batchp->type & T_MTA_BATCH) - if (strcasecmp(batchp->hostname, message->recipient.domain) != 0) - continue; - - break; - } - - return batchp; -} - -int -batch_cmp(struct batch *s1, struct batch *s2) -{ - /* - * do not return u_int64_t's - */ - if (s1->id < s2->id) - return (-1); - - if (s1->id > s2->id) - return (1); - - return (0); -} - -int -queue_message_schedule(struct message *messagep, time_t tm) -{ - time_t delay; - - /* Batch has been in the queue for too long and expired */ - if (tm - messagep->creation >= SMTPD_QUEUE_EXPIRY) { - messagep->flags |= F_MESSAGE_EXPIRED; - return 0; - } - - if (messagep->retry == 255) { - messagep->flags |= F_MESSAGE_EXPIRED; - return 0; - } - - if ((messagep->flags & F_MESSAGE_SCHEDULED) != 0) - return 0; - - if ((messagep->flags & F_MESSAGE_PROCESSING) != 0) - return 0; - - if (messagep->lasttry == 0) - return 1; - - delay = SMTPD_QUEUE_MAXINTERVAL; - - if (messagep->type & T_MDA_MESSAGE) { - if (messagep->retry < 5) - return 1; - - if (messagep->retry < 15) - delay = (messagep->retry * 60) + arc4random() % 60; - } - - if (messagep->type & T_MTA_MESSAGE) { - if (messagep->retry < 3) - delay = SMTPD_QUEUE_INTERVAL; - else if (messagep->retry <= 7) { - delay = SMTPD_QUEUE_INTERVAL * (1 << (messagep->retry - 3)); - if (delay > SMTPD_QUEUE_MAXINTERVAL) - delay = SMTPD_QUEUE_MAXINTERVAL; - } - } - - if (tm >= messagep->lasttry + delay) - return 1; - - return 0; -} - -void -batch_send(struct smtpd *env, struct batch *batchp, time_t curtime) -{ - u_int8_t proctype; - struct message *messagep; - - if ((batchp->type & (T_MDA_BATCH|T_MTA_BATCH)) == 0) - fatal("batch_send: unknown batch type"); - - if (batchp->type & T_MDA_BATCH) - proctype = PROC_MDA; - else if (batchp->type & T_MTA_BATCH) - proctype = PROC_MTA; - - imsg_compose(env->sc_ibufs[proctype], IMSG_CREATE_BATCH, 0, 0, -1, - batchp, sizeof (struct batch)); - - while ((messagep = TAILQ_FIRST(&batchp->messages))) { - imsg_compose(env->sc_ibufs[proctype], IMSG_BATCH_APPEND, 0, 0, - -1, messagep, sizeof (struct message)); - TAILQ_REMOVE(&batchp->messages, messagep, entry); - bzero(messagep, sizeof(struct message)); - free(messagep); - } - - imsg_compose(env->sc_ibufs[proctype], IMSG_BATCH_CLOSE, 0, 0, -1, - batchp, sizeof(struct batch)); -} - struct batch * batch_by_id(struct smtpd *env, u_int64_t id) { @@ -1763,5 +1261,3 @@ hash(u_int8_t *buf, size_t len) return h; } - -SPLAY_GENERATE(batchtree, batch, b_nodes, batch_cmp); |