diff options
Diffstat (limited to 'usr.sbin/smtpd/queue.c')
-rw-r--r-- | usr.sbin/smtpd/queue.c | 1494 |
1 files changed, 170 insertions, 1324 deletions
diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c index a5a38a335ca..6d774e85bd9 100644 --- a/usr.sbin/smtpd/queue.c +++ b/usr.sbin/smtpd/queue.c @@ -1,7 +1,6 @@ -/* $OpenBSD: queue.c,v 1.90 2010/09/20 09:01:09 gilles Exp $ */ +/* $OpenBSD: queue.c,v 1.91 2010/10/09 22:05:35 gilles Exp $ */ /* - * Copyright (c) 2008-2010 Jacek Masiulaniec <jacekm@dobremiasto.net> * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> * @@ -24,14 +23,11 @@ #include <sys/param.h> #include <sys/socket.h> #include <sys/stat.h> -#include <sys/uio.h> -#include <ctype.h> #include <errno.h> #include <event.h> #include <fcntl.h> #include <libgen.h> -#include <math.h> #include <pwd.h> #include <signal.h> #include <stdio.h> @@ -40,315 +36,156 @@ #include <unistd.h> #include "smtpd.h" -#include "queue_backend.h" -#include "client.h" -void queue_imsg(struct smtpd *, struct imsgev *, struct imsg *); -int queue_append(struct incoming *, char *); -void queue_destroy(struct incoming *); -int queue_control(u_int64_t, int); -__dead void queue_shutdown(void); -void queue_sig_handler(int, short, void *); - -void queue_mem_init(struct smtpd *); -void queue_mem_content_unref(struct content *); - -void queue_send(int, short, void *); -void queue_expire(struct batch *); -void queue_update(int, int, u_int64_t, char *); -void queue_done(int, int); -void queue_schedule(int, struct batch *); -void queue_sleep(int); -time_t queue_retry(int, time_t, time_t); - -void queue_bounce_wait(struct content *); -void queue_bounce_schedule(int, short, void *); -void queue_bounce_init(int, int); -void queue_bounce_event(int, short, void *); - -int queue_detect_loop(struct incoming *); - -struct incoming *incoming_alloc(u_int64_t); -struct batch *incoming_batch(struct incoming *, char *); -void incoming_schedule(struct incoming *); - -struct content *content_alloc(u_int64_t); - -struct batch *batch_alloc(struct content *, char *); - -struct action *action_alloc(u_int64_t); -void action_insert(struct action *, struct batch *); -struct action *action_grow(struct action *, char *); -void action_free(struct action *); - -int batchsort(const void *, const void *); - -/* table of batches in larval state */ -void **incoming; -int incoming_sz; - -struct queue runqs[3]; +void queue_imsg(struct smtpd *, struct imsgev *, struct imsg *); +void queue_pass_to_runner(struct smtpd *, struct imsgev *, struct imsg *); +__dead void queue_shutdown(void); +void queue_sig_handler(int, short, void *); +void queue_setup_events(struct smtpd *); +void queue_disable_events(struct smtpd *); +void queue_purge(char *); + +int queue_create_layout_message(char *, char *); +void queue_delete_layout_message(char *, char *); +int queue_record_layout_envelope(char *, struct message *); +int queue_remove_layout_envelope(char *, struct message *); +int queue_commit_layout_message(char *, struct message *); +int queue_open_layout_messagefile(char *, struct message *); void queue_imsg(struct smtpd *env, struct imsgev *iev, struct imsg *imsg) { - struct action *update; - struct incoming *s; + struct submit_status ss; struct message *m; - u_int64_t content_id; - int i, fd, error; - struct iovec iov[2]; - char aux[2048]; /* XXX */ + struct batch *b; + int fd, ret; if (iev->proc == PROC_SMTP) { - switch (imsg->hdr.type) { - case IMSG_QUEUE_CREATE: - /* - * Create file that will hold mail content. Its name - * uniquely identifies entire mail transaction. Actions - * will refer to the this file as source of mail content. - */ - if (queue_be_content_create(&content_id) < 0) - content_id = INVALID_ID; - - s = incoming_alloc(content_id); - if (s == NULL) - fatal(NULL); + m = imsg->data; - i = table_alloc(&incoming, &incoming_sz); - incoming[i] = s; - - iov[0].iov_base = &content_id; - iov[0].iov_len = sizeof content_id; - iov[1].iov_base = &i; - iov[1].iov_len = sizeof i; - imsg_composev(&iev->ibuf, IMSG_QUEUE_CREATE, - imsg->hdr.peerid, 0, -1, iov, 2); - imsg_event_add(iev); - return; - - case IMSG_QUEUE_DELETE: - /* - * Delete failed transaction's content and actions. - */ - memcpy(&i, imsg->data, sizeof i); - - s = table_lookup(incoming, incoming_sz, i); - if (s == NULL) - fatalx("queue: bogus delete req"); - - queue_destroy(s); - incoming[i] = NULL; + switch (imsg->hdr.type) { + case IMSG_QUEUE_CREATE_MESSAGE: + ss.id = m->session_id; + ss.code = 250; + bzero(ss.u.msgid, sizeof ss.u.msgid); + if (m->flags & F_MESSAGE_ENQUEUED) + ret = enqueue_create_layout(ss.u.msgid); + else + ret = queue_create_incoming_layout(ss.u.msgid); + if (ret == 0) + ss.code = 421; + imsg_compose_event(iev, IMSG_QUEUE_CREATE_MESSAGE, 0, 0, -1, + &ss, sizeof ss); return; - case IMSG_QUEUE_OPEN: - /* - * Open the file that will hold mail content. - */ - memcpy(&i, imsg->data, sizeof i); - - s = table_lookup(incoming, incoming_sz, i); - if (s == NULL) - fatalx("queue: bogus open req"); - - fd = queue_be_content_open(s->content->id, 1); - if (fd < 0) - fatal("queue: content open error"); - - imsg_compose_event(iev, IMSG_QUEUE_OPEN, - imsg->hdr.peerid, 0, fd, NULL, 0); + case IMSG_QUEUE_REMOVE_MESSAGE: + if (m->flags & F_MESSAGE_ENQUEUED) + enqueue_delete_message(m->message_id); + else + queue_delete_incoming_message(m->message_id); return; - case IMSG_QUEUE_CLOSE: - /* - * Commit mail to queue: we take on responsibility for - * performing all requested actions on this content. - */ - memcpy(&i, imsg->data, sizeof i); - - s = table_lookup(incoming, incoming_sz, i); - if (s == NULL) - fatalx("queue: bogus commit req"); - - if (queue_detect_loop(s) < 0) { - error = S_MESSAGE_PERMFAILURE; - imsg_compose_event(iev, IMSG_QUEUE_CLOSE, - imsg->hdr.peerid, 0, -1, &error, sizeof error); - return; - } - - if (queue_be_commit(s->content->id) < 0) { - error = S_MESSAGE_TEMPFAILURE; - imsg_compose_event(iev, IMSG_QUEUE_CLOSE, - imsg->hdr.peerid, 0, -1, &error, sizeof error); - return; + case IMSG_QUEUE_COMMIT_MESSAGE: + ss.id = m->session_id; + if (m->flags & F_MESSAGE_ENQUEUED) { + if (enqueue_commit_message(m)) + env->stats->queue.inserts_local++; + else + ss.code = 421; + } else { + if (queue_commit_incoming_message(m)) + env->stats->queue.inserts_remote++; + else + ss.code = 421; } + imsg_compose_event(iev, IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, + &ss, sizeof ss); + return; - env->stats->queue.inserts++; - env->stats->queue.length++; - - incoming_schedule(s); - incoming[i] = NULL; - for (i = 0; i < s->nlocal; i++) - free(s->local[i]); - free(s->local); - free(s); - - error = 0; - imsg_compose_event(iev, IMSG_QUEUE_CLOSE, - imsg->hdr.peerid, 0, -1, &error, sizeof error); + case IMSG_QUEUE_MESSAGE_FILE: + ss.id = m->session_id; + if (m->flags & F_MESSAGE_ENQUEUED) + fd = enqueue_open_messagefile(m); + else + fd = queue_open_incoming_message_file(m); + if (fd == -1) + ss.code = 421; + imsg_compose_event(iev, IMSG_QUEUE_MESSAGE_FILE, 0, 0, fd, + &ss, sizeof ss); return; case IMSG_SMTP_ENQUEUE: - queue_bounce_init(imsg->hdr.peerid, imsg->fd); + queue_pass_to_runner(env, iev, imsg); return; } } if (iev->proc == PROC_LKA) { - switch (imsg->hdr.type) { - case IMSG_QUEUE_APPEND: - m = imsg->data; - - s = table_lookup(incoming, incoming_sz, m->queue_id); - if (s == NULL) - fatalx("queue: bogus append"); + m = imsg->data; - switch (m->recipient.rule.r_action) { - case A_MBOX: - case A_MAILDIR: - case A_EXT: - /* ?|from|to|user1|user2|path */ - if (m->recipient.rule.r_action == A_MBOX) - strlcpy(aux, "M|", sizeof aux); - else if (m->recipient.rule.r_action == A_MAILDIR) - strlcpy(aux, "D|", sizeof aux); - else - strlcpy(aux, "P|", sizeof aux); - if (m->sender.user[0] && m->sender.domain[0]) { - strlcat(aux, m->sender.user, sizeof aux); - strlcat(aux, "@", sizeof aux); - strlcat(aux, m->sender.domain, sizeof aux); - } - strlcat(aux, "|", sizeof aux); - strlcat(aux, m->session_rcpt.user, sizeof aux); - strlcat(aux, "@", sizeof aux); - strlcat(aux, m->session_rcpt.domain, sizeof aux); - strlcat(aux, "|", sizeof aux); - strlcat(aux, m->sender.pw_name, sizeof aux); - strlcat(aux, "|", sizeof aux); - strlcat(aux, m->recipient.pw_name, sizeof aux); - strlcat(aux, "|", sizeof aux); - strlcat(aux, m->recipient.rule.r_value.buffer, sizeof aux); - break; + switch (imsg->hdr.type) { + case IMSG_QUEUE_SUBMIT_ENVELOPE: + m->id = generate_uid(); + ss.id = m->session_id; - case A_FILENAME: - /* F|from|to|user1|user2|path */ - strlcpy(aux, "F|", sizeof aux); - if (m->sender.user[0] && m->sender.domain[0]) { - strlcat(aux, m->sender.user, sizeof aux); - strlcat(aux, "@", sizeof aux); - strlcat(aux, m->sender.domain, sizeof aux); - } - strlcat(aux, "|", sizeof aux); - strlcat(aux, m->session_rcpt.user, sizeof aux); - strlcat(aux, "@", sizeof aux); - strlcat(aux, m->session_rcpt.domain, sizeof aux); - strlcat(aux, "|", sizeof aux); - strlcat(aux, m->sender.pw_name, sizeof aux); - strlcat(aux, "|", sizeof aux); - strlcat(aux, SMTPD_USER, sizeof aux); - strlcat(aux, "|", sizeof aux); - strlcat(aux, m->recipient.u.filename, sizeof aux); - break; + if (IS_MAILBOX(m->recipient) || IS_EXT(m->recipient)) + m->type = T_MDA_MESSAGE; + else + m->type = T_MTA_MESSAGE; - case A_RELAY: - case A_RELAYVIA: - /* R|from|to|user|rcpt|via|port|ssl|cert|auth */ - strlcpy(aux, "R|", sizeof aux); - if (m->sender.user[0] && m->sender.domain[0]) { - strlcat(aux, m->sender.user, sizeof aux); - strlcat(aux, "@", sizeof aux); - strlcat(aux, m->sender.domain, sizeof aux); - } - strlcat(aux, "|", sizeof aux); - strlcat(aux, m->session_rcpt.user, sizeof aux); - strlcat(aux, "@", sizeof aux); - strlcat(aux, m->session_rcpt.domain, sizeof aux); - strlcat(aux, "|", sizeof aux); - strlcat(aux, m->sender.pw_name, sizeof aux); - strlcat(aux, "|", sizeof aux); - strlcat(aux, m->recipient.user, sizeof aux); - strlcat(aux, "@", sizeof aux); - strlcat(aux, m->recipient.domain, sizeof aux); - strlcat(aux, "|", sizeof aux); - if (m->recipient.rule.r_action == A_RELAYVIA) - strlcat(aux, m->recipient.rule.r_value.relayhost.hostname, sizeof aux); - strlcat(aux, "|", sizeof aux); - if (m->recipient.rule.r_value.relayhost.port) { - char port[10]; - snprintf(port, sizeof port, "%d", ntohs(m->recipient.rule.r_value.relayhost.port)); - strlcat(aux, port, sizeof aux); - } - strlcat(aux, "|", sizeof aux); - switch (m->recipient.rule.r_value.relayhost.flags & F_SSL) { - case F_SSL: - strlcat(aux, "ssl", sizeof aux); - break; - case F_SMTPS: - strlcat(aux, "smtps", sizeof aux); - break; - case F_STARTTLS: - strlcat(aux, "starttls", sizeof aux); - break; - } - strlcat(aux, "|", sizeof aux); - strlcat(aux, m->recipient.rule.r_value.relayhost.cert, sizeof aux); - strlcat(aux, "|", sizeof aux); - if (m->recipient.rule.r_value.relayhost.flags & F_AUTH) - strlcat(aux, "secrets", sizeof aux); - break; + /* Write to disk */ + if (m->flags & F_MESSAGE_ENQUEUED) + ret = enqueue_record_envelope(m); + else + ret = queue_record_incoming_envelope(m); - default: - fatalx("queue: bad r_action"); + if (ret == 0) { + ss.code = 421; + imsg_compose_event(env->sc_ievs[PROC_SMTP], + IMSG_QUEUE_TEMPFAIL, 0, 0, -1, &ss, + sizeof ss); } + return; - if (queue_append(s, aux) < 0) - error = S_MESSAGE_TEMPFAILURE; - else - error = 0; - - imsg_compose_event(iev, IMSG_QUEUE_APPEND, - imsg->hdr.peerid, 0, -1, &error, sizeof error); + case IMSG_QUEUE_COMMIT_ENVELOPES: + ss.id = m->session_id; + ss.code = 250; + imsg_compose_event(env->sc_ievs[PROC_SMTP], + IMSG_QUEUE_COMMIT_ENVELOPES, 0, 0, -1, &ss, + sizeof ss); return; } } - if (iev->proc == PROC_MDA) { + if (iev->proc == PROC_RUNNER) { + /* forward imsgs from runner on its behalf */ + imsg_compose_event(env->sc_ievs[imsg->hdr.peerid], imsg->hdr.type, + 0, imsg->hdr.pid, imsg->fd, (char *)imsg->data, + imsg->hdr.len - sizeof imsg->hdr); + return; + } + + if (iev->proc == PROC_MTA) { switch (imsg->hdr.type) { - case IMSG_BATCH_UPDATE: - update = imsg->data; - queue_update(Q_LOCAL, imsg->hdr.peerid, update->id, - update->data); + case IMSG_QUEUE_MESSAGE_FD: + b = imsg->data; + fd = queue_open_message_file(b->message_id); + imsg_compose_event(iev, IMSG_QUEUE_MESSAGE_FD, 0, 0, + fd, b, sizeof *b); return; + case IMSG_QUEUE_MESSAGE_UPDATE: case IMSG_BATCH_DONE: - queue_done(Q_LOCAL, imsg->hdr.peerid); + queue_pass_to_runner(env, iev, imsg); return; - } } - if (iev->proc == PROC_MTA) { + if (iev->proc == PROC_MDA) { switch (imsg->hdr.type) { - case IMSG_BATCH_UPDATE: - update = imsg->data; - queue_update(Q_RELAY, imsg->hdr.peerid, update->id, - update->data); - return; - - case IMSG_BATCH_DONE: - queue_done(Q_RELAY, imsg->hdr.peerid); + case IMSG_QUEUE_MESSAGE_UPDATE: + case IMSG_MDA_SESS_NEW: + queue_pass_to_runner(env, iev, imsg); return; } } @@ -356,59 +193,25 @@ queue_imsg(struct smtpd *env, struct imsgev *iev, struct imsg *imsg) if (iev->proc == PROC_CONTROL) { switch (imsg->hdr.type) { case IMSG_QUEUE_PAUSE_LOCAL: - runqs[Q_LOCAL].max = 0; - queue_sleep(Q_LOCAL); - return; - - case IMSG_QUEUE_PAUSE_RELAY: - runqs[Q_RELAY].max = 0; - queue_sleep(Q_RELAY); - return; - + case IMSG_QUEUE_PAUSE_OUTGOING: case IMSG_QUEUE_RESUME_LOCAL: - runqs[Q_LOCAL].max = env->sc_maxconn; - queue_sleep(Q_LOCAL); - return; - - case IMSG_QUEUE_RESUME_RELAY: - runqs[Q_RELAY].max = env->sc_maxconn; - queue_sleep(Q_RELAY); - return; - + case IMSG_QUEUE_RESUME_OUTGOING: case IMSG_QUEUE_SCHEDULE: - memcpy(&content_id, imsg->data, sizeof content_id); - error = queue_control(content_id, 1); - if (error) - log_warnx("schedule request failed"); - else { - queue_sleep(Q_LOCAL); - queue_sleep(Q_RELAY); - queue_sleep(Q_BOUNCE); - } - imsg_compose_event(iev, IMSG_QUEUE_SCHEDULE, - imsg->hdr.peerid, 0, -1, &error, sizeof error); - return; - case IMSG_QUEUE_REMOVE: - memcpy(&content_id, imsg->data, sizeof content_id); - error = queue_control(content_id, 0); - if (error) - log_warnx("remove request failed"); - else { - queue_sleep(Q_LOCAL); - queue_sleep(Q_RELAY); - queue_sleep(Q_BOUNCE); - } - imsg_compose_event(iev, IMSG_QUEUE_REMOVE, - imsg->hdr.peerid, 0, -1, &error, sizeof error); + queue_pass_to_runner(env, iev, imsg); return; } } if (iev->proc == PROC_PARENT) { switch (imsg->hdr.type) { + case IMSG_PARENT_ENQUEUE_OFFLINE: + queue_pass_to_runner(env, iev, imsg); + return; + case IMSG_CTL_VERBOSE: log_verbose(*(int *)imsg->data); + queue_pass_to_runner(env, iev, imsg); return; } } @@ -416,141 +219,12 @@ queue_imsg(struct smtpd *env, struct imsgev *iev, struct imsg *imsg) fatalx("queue_imsg: unexpected imsg"); } -int -queue_append(struct incoming *s, char *auxraw) -{ - struct batch *batch; - struct action *action; - char *copy; - struct aux aux; - u_int64_t action_id; - - log_debug("aux %s", auxraw); - - copy = strdup(auxraw); - if (copy == NULL) - fatal(NULL); - auxsplit(&aux, copy); - - /* remember local recipients for delivered-to: loop detection */ - if (aux.mode[0] != 'R') { - if (s->nlocal == s->local_sz) { - s->local_sz *= 2; - s->local = realloc(s->local, ++s->local_sz * - sizeof s->local[0]); - if (s->local == NULL) - fatal(NULL); - } - /* - * XXX: using rcpt_to is wrong because it's unexpanded address - * as seen in RCPT TO; must use expanded address in the form - * <user>@<domain>, but since lka expands local addresses to - * just <user> this is currently undoable. - */ - s->local[s->nlocal] = strdup(aux.rcpt_to); - if (s->local[s->nlocal] == NULL) - fatal(NULL); - s->nlocal++; - } - - /* assign batch */ - if (aux.mode[0] != 'R') - batch = incoming_batch(s, ""); - else if (aux.relay_via[0]) - batch = incoming_batch(s, aux.relay_via); - else - batch = incoming_batch(s, strchr(aux.rcpt, '@')); - - if (batch == NULL) - fatal(NULL); - - free(copy); - - if (queue_be_action_new(s->content->id, &action_id, auxraw) < 0) - return -1; - - action = action_alloc(action_id); - if (action == NULL) - fatal(NULL); - - action_insert(action, batch); - - return 0; -} - void -queue_destroy(struct incoming *s) +queue_pass_to_runner(struct smtpd *env, struct imsgev *iev, struct imsg *imsg) { - struct batch *batch; - struct action *action; - u_int rq; - int i; - - for (rq = 0; rq < nitems(s->batches); rq++) { - while ((batch = SLIST_FIRST(&s->batches[rq]))) { - SLIST_REMOVE_HEAD(&s->batches[rq], entry); - while ((action = SLIST_FIRST(&batch->actions))) { - SLIST_REMOVE_HEAD(&batch->actions, entry); - queue_be_action_delete(s->content->id, - action->id); - action_free(action); - } - free(batch); - } - } - queue_be_content_delete(s->content->id); - - free(s->content); - for (i = 0; i < s->nlocal; i++) - free(s->local[i]); - free(s->local); - free(s); -} - -/* - * Walk all runqueues to schedule or remove requested content. - */ -int -queue_control(u_int64_t content_id, int schedule) -{ - struct batch *b, *next; - struct action *action; - struct action_be a; - struct aux aux; - u_int rq, n; - - n = 0; - for (rq = 0; rq < nitems(runqs); rq++) { - for (b = SLIST_FIRST(&runqs[rq].head); b; b = next) { - next = SLIST_NEXT(b, entry); - if (content_id && b->content->id != content_id) - continue; - n++; - SLIST_REMOVE(&runqs[rq].head, b, batch, entry); - if (schedule) { - time(&b->retry); - queue_schedule(rq, b); - continue; - } - while ((action = SLIST_FIRST(&b->actions))) { - SLIST_REMOVE_HEAD(&b->actions, entry); - if (queue_be_action_read(&a, b->content->id, - action->id) < 0) - fatal("queue: action read error"); - auxsplit(&aux, a.aux); - log_info("%s: to=%s, delay=%d, stat=Removed", - queue_be_decode(b->content->id), - rcpt_pretty(&aux), time(NULL) - a.birth); - queue_be_action_delete(b->content->id, - action->id); - queue_mem_content_unref(b->content); - action_free(action); - } - free(b); - } - } - - return (n > 0 ? 0 : -1); + imsg_compose_event(env->sc_ievs[PROC_RUNNER], imsg->hdr.type, + iev->proc, imsg->hdr.pid, imsg->fd, imsg->data, + imsg->hdr.len - sizeof imsg->hdr); } void @@ -569,16 +243,26 @@ queue_sig_handler(int sig, short event, void *p) void queue_shutdown(void) { - log_info("queue exiting"); + log_info("queue handler exiting"); _exit(0); } +void +queue_setup_events(struct smtpd *env) +{ +} + +void +queue_disable_events(struct smtpd *env) +{ +} + pid_t queue(struct smtpd *env) { pid_t pid; struct passwd *pw; - u_int rq; + struct event ev_sigint; struct event ev_sigterm; @@ -588,7 +272,8 @@ queue(struct smtpd *env) { PROC_SMTP, imsg_dispatch }, { PROC_MDA, imsg_dispatch }, { PROC_MTA, imsg_dispatch }, - { PROC_LKA, imsg_dispatch } + { PROC_LKA, imsg_dispatch }, + { PROC_RUNNER, imsg_dispatch } }; switch (pid = fork()) { @@ -617,38 +302,9 @@ queue(struct smtpd *env) setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid)) fatal("queue: cannot drop privileges"); - /* - * Queue opens fds for four purposes: smtp, mta, mda, and bounces. - * Therefore, use all available fd space and set the maxconn (=max - * session count for each of these tasks) to a quarter of this value. - */ - fdlimit(1.0); - if ((env->sc_maxconn = availdesc() / 4) < 1) - fatalx("queue: fd starvation"); - imsg_callback = queue_imsg; event_init(); - config_pipes(env, peers, nitems(peers)); - config_peers(env, peers, nitems(peers)); - - for (rq = 0; rq < nitems(runqs); rq++) { - SLIST_INIT(&runqs[rq].head); - runqs[rq].env = env; - runqs[rq].max = env->sc_maxconn; - } - runqs[Q_LOCAL].name = "Q_LOCAL"; - runqs[Q_RELAY].name = "Q_RELAY"; - runqs[Q_BOUNCE].name = "Q_BOUNCE"; - - /* bouncing costs 2 fds: file and socket */ - runqs[Q_BOUNCE].max /= 2; - - queue_mem_init(env); - queue_sleep(Q_LOCAL); - queue_sleep(Q_RELAY); - queue_sleep(Q_BOUNCE); - signal_set(&ev_sigint, SIGINT, queue_sig_handler, env); signal_set(&ev_sigterm, SIGTERM, queue_sig_handler, env); signal_add(&ev_sigint, NULL); @@ -656,876 +312,66 @@ queue(struct smtpd *env) signal(SIGPIPE, SIG_IGN); signal(SIGHUP, SIG_IGN); - if (event_dispatch() < 0) - fatal("event_dispatch"); - queue_shutdown(); - - return (0); -} - -void -queue_mem_init(struct smtpd *env) -{ - SLIST_HEAD(,batch) bhash[4096]; - void **btab; - struct content *content; - struct action *action; - struct batch *batch; - char *sortkey; - struct action_be a; - struct aux aux; - int btab_sz, nbtab, rq, i; - - for (i = 0; i < 4096; i++) - SLIST_INIT(&bhash[i]); - btab = NULL; - btab_sz = 0; - nbtab = 0; - /* - * Sort actions into batches. - */ - for (;;) { - if (queue_be_getnext(&a) < 0) - fatal("queue: backend error"); - if (a.action_id == 0) - break; - auxsplit(&aux, a.aux); - - /* - * Assignment to batch is based on the sortkey: - * B=<content_id> for bounced mail - * R=<domain> for relayed mail - * L=<action_id> for local mail - */ - if (a.status[0] == '5' || a.status[0] == '6') - asprintf(&sortkey, "B=%s", queue_be_decode(a.content_id)); - else if (aux.mode[0] == 'R') { - if (aux.relay_via[0]) - asprintf(&sortkey, "R=%s", aux.relay_via); - else - asprintf(&sortkey, "R=%s", strchr(aux.rcpt, '@')); - } else - asprintf(&sortkey, "L=%s", queue_be_decode(a.action_id)); - - content = NULL; - SLIST_FOREACH(batch, &bhash[a.content_id & 4095], entry) { - if (batch->content->id == a.content_id) { - content = batch->content; - if (strcmp(batch->sortkey, sortkey) == 0) - break; - } - } - - if (batch == NULL) { - if (content == NULL) { - content = content_alloc(a.content_id); - if (content == NULL) - fatal("queue_mem_init"); - env->stats->queue.length++; - } - - batch = batch_alloc(content, sortkey); - if (batch == NULL) - fatal("queue_mem_init"); - - if (*sortkey == 'B') - rq = Q_BOUNCE; - else if (*sortkey == 'R') - rq = Q_RELAY; - else - rq = Q_LOCAL; - - batch->retry = queue_retry(rq, a.birth, a.birth); - while (batch->retry < time(NULL)) - batch->retry = queue_retry(rq, a.birth, - batch->retry); - - if (batch->retry > a.birth + env->sc_qexpire) - batch->retry = NO_RETRY_EXPIRED; - - SLIST_INSERT_HEAD(&bhash[a.content_id & 4095], batch, - entry); - if (nbtab == btab_sz) { - btab_sz *= 2; - btab = realloc(btab, ++btab_sz * sizeof *btab); - if (btab == NULL) - fatal("queue_mem_init"); - } - btab[nbtab] = batch; - nbtab++; - } - - action = action_alloc(a.action_id); - if (action == NULL) - fatal("queue_mem_init"); - - action_insert(action, batch); - - free(sortkey); - } - - /* - * Add batches to schedule. + * queue opens fds for four purposes: smtp, mta, mda, and bounces. + * Therefore, use all available fd space and set the maxconn (=max + * session count for mta and mda) to a quarter of this value. */ - qsort(btab, nbtab, sizeof *btab, batchsort); - for (i = 0; i < nbtab; i++) { - batch = btab[i]; - if (batch->sortkey[0] == 'B') - rq = Q_BOUNCE; - else if (batch->sortkey[0] == 'R') - rq = Q_RELAY; - else - rq = Q_LOCAL; - queue_schedule(rq, batch); - } - - free(btab); -} - -int -batchsort(const void *x, const void *y) -{ - const struct batch *b1 = x, *b2 = y; - return (b1->retry < b2->retry ? -1 : b1->retry > b2->retry); -} - -void -queue_mem_content_unref(struct content *content) -{ - content->ref--; - if (content->ref < 0) - fatalx("queue: bad refcount"); - else if (content->ref == 0) { - queue_be_content_delete(content->id); - runqs[Q_LOCAL].env->stats->queue.length--; - } -} - -void -queue_send(int fd, short event, void *p) -{ - struct smtpd *env; - struct batch *batch; - struct action *action; - struct action_be a; - int rq, i, to; - time_t now; - - rq = (struct queue *)p - runqs; - env = runqs[rq].env; - time(&now); - i = -1; - - while ((batch = SLIST_FIRST(&runqs[rq].head))) { - if (batch->retry > now || runqs[rq].sessions >= runqs[rq].max) - break; - - SLIST_REMOVE_HEAD(&runqs[rq].head, entry); - i = table_alloc(&runqs[rq].session, &runqs[rq].session_sz); - runqs[rq].session[i] = batch; - runqs[rq].sessions++; - - log_debug("%s: %d: start %s", runqs[rq].name, i, - queue_be_decode(batch->content->id)); - - if (batch->retry == NO_RETRY_EXPIRED) { - log_debug("%s: %d: expire", runqs[rq].name, i); - queue_expire(batch); - queue_done(rq, i); - continue; - } - - if (rq == Q_BOUNCE) { - log_debug("%s: %d: socket request", runqs[rq].name, i); - imsg_compose_event(env->sc_ievs[PROC_SMTP], - IMSG_SMTP_ENQUEUE, i, 0, -1, NULL, 0); - continue; - } - - log_debug("%s: %d: send", runqs[rq].name, i); - - fd = queue_be_content_open(batch->content->id, 0); - if (fd < 0) - fatal("queue: content open error"); - - if (rq == Q_LOCAL) - to = PROC_MDA; - else - to = PROC_MTA; - - imsg_compose_event(env->sc_ievs[to], IMSG_BATCH_CREATE, i, 0, - fd, &batch->content->id, sizeof batch->content->id); - - while ((action = SLIST_FIRST(&batch->actions))) { - SLIST_REMOVE_HEAD(&batch->actions, entry); - - if (queue_be_action_read(&a, batch->content->id, - action->id) < 0) - fatal("queue: action read error"); - - action = action_grow(action, a.aux); - if (action == NULL) - fatal(NULL); - - imsg_compose_event(env->sc_ievs[to], IMSG_BATCH_APPEND, - i, 0, -1, action, sizeof *action + strlen(a.aux)); - - action_free(action); - } - - imsg_compose_event(env->sc_ievs[to], IMSG_BATCH_CLOSE, i, 0, -1, - &a.birth, sizeof a.birth); - } - - /* Sanity check: were we called for no good reason? */ - if (i == -1) - fatalx("queue_send: empty run"); - - queue_sleep(rq); -} - -void -queue_expire(struct batch *batch) -{ - struct action *action, *fail; - struct action_be a; - struct aux aux; - time_t birth; - int error; - - action = SLIST_FIRST(&batch->actions); - if (queue_be_action_read(&a, batch->content->id, action->id) < 0) - fatal("queue: action read error"); - - auxsplit(&aux, a.aux); - birth = a.birth; - - if (a.status[0] == '5' || a.status[0] == '6') { - log_warnx("%s: to=%s, delay=%d, stat=Expired (no bounce due " - "to: larval bounce)", - queue_be_decode(batch->content->id), aux.mail_from, - time(NULL) - birth); - - while ((action = SLIST_FIRST(&batch->actions))) { - SLIST_REMOVE_HEAD(&batch->actions, entry); - queue_be_action_delete(batch->content->id, action->id); - queue_mem_content_unref(batch->content); - action_free(action); - } - - return; - } - - if (aux.mail_from[0] == '\0') { - while ((action = SLIST_FIRST(&batch->actions))) { - SLIST_REMOVE_HEAD(&batch->actions, entry); - - if (queue_be_action_read(&a, batch->content->id, - action->id) < 0) - fatal("queue: action read error"); - auxsplit(&aux, a.aux); - - log_warnx("%s: to=%s, delay=%d, stat=Expired (no bounce " - "due to: double bounce)", - queue_be_decode(batch->content->id), - rcpt_pretty(&aux), time(NULL) - birth); - - queue_be_action_delete(batch->content->id, action->id); - queue_mem_content_unref(batch->content); - action_free(action); - } - return; - } - - SLIST_FOREACH(action, &batch->actions, entry) - if (queue_be_action_status(batch->content->id, action->id, - "600 Message expired after too many delivery attempts") < 0) - break; - - if (action) { - fail = action; - error = errno; - } else { - fail = NULL; - error = 0; - } - - while ((action = SLIST_FIRST(&batch->actions))) { - if (action == fail) - break; - SLIST_REMOVE_HEAD(&batch->actions, entry); - - if (queue_be_action_read(&a, batch->content->id, - action->id) < 0) - fatal("queue: action read error"); - auxsplit(&aux, a.aux); - - log_info("%s: to=%s, delay=%d, stat=Expired", - queue_be_decode(batch->content->id), rcpt_pretty(&aux), - time(NULL) - birth); - - SLIST_INSERT_HEAD(&batch->content->actions, action, entry); - queue_bounce_wait(batch->content); - } - - while ((action = SLIST_FIRST(&batch->actions))) { - SLIST_REMOVE_HEAD(&batch->actions, entry); - - if (queue_be_action_read(&a, batch->content->id, - action->id) < 0) - fatal("queue: action read error"); - auxsplit(&aux, a.aux); - - log_warnx("%s: to=%s, delay=%d, stat=Expired (no bounce due " - "to: %s)", - queue_be_decode(batch->content->id), rcpt_pretty(&aux), - time(NULL) - birth, strerror(error)); - - queue_be_action_delete(batch->content->id, action->id); - queue_mem_content_unref(batch->content); - action_free(action); - } -} - -void -queue_update(int rq, int i, u_int64_t action_id, char *new_status) -{ - struct batch *batch; - struct action *action; - struct action_be a; - struct aux aux; - - batch = table_lookup(runqs[rq].session, runqs[rq].session_sz, i); - if (batch == NULL) - fatalx("queue: bogus update"); - - if (*new_status == '2') { - queue_be_action_delete(batch->content->id, action_id); - queue_mem_content_unref(batch->content); - return; - } - - action = malloc(sizeof *action); - if (action == NULL) - fatal(NULL); - action->id = action_id; - - if (*new_status == '5' || *new_status == '6') { - if (queue_be_action_read(&a, batch->content->id, action_id) < 0) - fatal("queue: queue read error"); - - auxsplit(&aux, a.aux); - - if (aux.mail_from[0] == '\0') { - log_warnx("%s: bounce recipient %s not contactable, " - "bounce dropped", - queue_be_decode(batch->content->id), aux.rcpt_to); - queue_be_action_delete(batch->content->id, action_id); - queue_mem_content_unref(batch->content); - action_free(action); - return; - } - - if (queue_be_action_status(batch->content->id, action_id, - new_status) < 0) { - log_warn("%s: recipient %s not contactable, bounce not " - "created due to queue error", - queue_be_decode(batch->content->id), aux.rcpt_to); - queue_be_action_delete(batch->content->id, action_id); - queue_mem_content_unref(batch->content); - action_free(action); - return; - } - - SLIST_INSERT_HEAD(&batch->content->actions, action, entry); - - queue_bounce_wait(batch->content); - } else { - queue_be_action_status(batch->content->id, action_id, new_status); - SLIST_INSERT_HEAD(&batch->actions, action, entry); - } -} - -void -queue_done(int rq, int i) -{ - struct action_be a; - struct batch *batch; - struct action *action; - - /* Take batch off the session table. */ - batch = table_lookup(runqs[rq].session, runqs[rq].session_sz, i); - if (batch == NULL) - fatalx("queue: bogus batch"); - runqs[rq].session[i] = NULL; - runqs[rq].sessions--; - - log_debug("%s: %d: done", runqs[rq].name, i); - - /* All actions sent? */ - if (SLIST_EMPTY(&batch->actions)) { - if (batch->content->ref == 0) { - free(batch->content->ev); - free(batch->content); - } - free(batch); - } else { - /* Batch has actions with temporary errors. */ - action = SLIST_FIRST(&batch->actions); - if (queue_be_action_read(&a, batch->content->id, - action->id) < 0) - fatal("queue: action read error"); - batch->retry = queue_retry(rq, a.birth, batch->retry); - if (batch->retry > a.birth + runqs[rq].env->sc_qexpire) - batch->retry = NO_RETRY_EXPIRED; - queue_schedule(rq, batch); - } - - queue_sleep(rq); -} - -/* - * Insert batch into runqueue in retry time order. - */ -void -queue_schedule(int rq, struct batch *batch) -{ - struct batch *b, *prev; - - prev = NULL; - - SLIST_FOREACH(b, &runqs[rq].head, entry) { - if (b->retry >= batch->retry) { - if (prev) - SLIST_INSERT_AFTER(prev, batch, entry); - else - SLIST_INSERT_HEAD(&runqs[rq].head, batch, - entry); - break; - } - prev = b; - } - - if (b == NULL) { - if (prev) - SLIST_INSERT_AFTER(prev, batch, entry); - else - SLIST_INSERT_HEAD(&runqs[rq].head, batch, entry); - } -} - -void -queue_sleep(int rq) -{ - struct timeval tv; - struct batch *next; - time_t now; + fdlimit(1.0); + if ((env->sc_maxconn = availdesc() / 4) < 1) + fatalx("runner: fd starvation"); - evtimer_del(&runqs[rq].ev); + config_pipes(env, peers, nitems(peers)); + config_peers(env, peers, nitems(peers)); - if (runqs[rq].sessions >= runqs[rq].max) - return; + queue_purge(PATH_INCOMING); + queue_purge(PATH_ENQUEUE); - next = SLIST_FIRST(&runqs[rq].head); - if (next == NULL) - return; + queue_setup_events(env); - time(&now); - if (next->retry < now) - tv.tv_sec = 0; - else - tv.tv_sec = next->retry - now; - tv.tv_usec = 0; - - log_debug("%s: sleep %lus", runqs[rq].name, tv.tv_sec); - - evtimer_set(&runqs[rq].ev, queue_send, &runqs[rq]); - evtimer_add(&runqs[rq].ev, &tv); -} - -/* - * Qmail-like retry schedule. - * - * Local deliveries are tried more often than remote. - */ -time_t -queue_retry(int rq, time_t birth, time_t last) -{ - int n; - - if (last - birth < 0) - n = 0; - else if (rq == Q_RELAY) - n = sqrt(last - birth) + 20; - else - n = sqrt(last - birth) + 10; - - return birth + n * n; -} - -/* - * Wait for permanent failures against this content for few more seconds. - * If none arrive, combine them into single batch and put on Q_BOUNCE - * runqueue. If one does arrive, append it, and restart the timer. - */ -void -queue_bounce_wait(struct content *content) -{ - struct timeval tv; - - if (content->ev == NULL) { - content->ev = calloc(1, sizeof *content->ev); - if (content->ev == NULL) - fatal(NULL); - } - tv.tv_sec = 3; - tv.tv_usec = 0; - evtimer_del(content->ev); - evtimer_set(content->ev, queue_bounce_schedule, content); - evtimer_add(content->ev, &tv); -} - -void -queue_bounce_schedule(int fd, short event, void *p) -{ - struct content *content = p; - struct batch *batch; - struct action *action; - - free(content->ev); - content->ev = NULL; - - batch = malloc(sizeof *batch); - if (batch == NULL) - fatal(NULL); - SLIST_INIT(&batch->actions); - batch->content = content; - while ((action = SLIST_FIRST(&content->actions))) { - SLIST_REMOVE_HEAD(&content->actions, entry); - SLIST_INSERT_HEAD(&batch->actions, action, entry); - } - time(&batch->retry); - queue_schedule(Q_BOUNCE, batch); - queue_sleep(Q_BOUNCE); -} - -void -queue_bounce_init(int i, int sock) -{ - struct smtpd *env = runqs[Q_BOUNCE].env; - struct batch *batch; - struct bounce *s; - struct action *action; - struct action_be a; - struct aux aux; - int fd, header; - - log_debug("%s: %d: init", runqs[Q_BOUNCE].name, i); - - batch = table_lookup(runqs[Q_BOUNCE].session, - runqs[Q_BOUNCE].session_sz, i); - if (batch == NULL) - fatalx("queue: bogus bounce batch"); - - if (sock < 0) { - queue_done(Q_BOUNCE, i); - return; - } - - fd = queue_be_content_open(batch->content->id, 0); - if (fd < 0) - fatal("queue: content open error"); - - s = calloc(1, sizeof *s); - if (s == NULL) - fatal(NULL); - s->batch = batch; - s->pcb = client_init(sock, fd, env->sc_hostname, 1); - s->id = i; - client_sender(s->pcb, ""); - client_ssl_optional(s->pcb); - - header = 0; - SLIST_FOREACH(action, &batch->actions, entry) { - if (queue_be_action_read(&a, batch->content->id, - action->id) < 0) - fatal("queue: backend read error"); - auxsplit(&aux, a.aux); - if (header == 0) { - client_rcpt(s->pcb, "%s", aux.mail_from); - client_printf(s->pcb, - "From: Mailer Daemon <MAILER-DAEMON@%s>\n" - "To: %s\n" - "Subject: Delivery status notification\n" - "Date: %s\n" - "\n" - "This is automated mail delivery notification, please DO NOT REPLY.\n" - "An error has occurred while attempting to deliver your mail to the\n" - "following recipients:\n" - "\n", - env->sc_hostname, aux.mail_from, - time_to_text(time(NULL))); - header = 1; - } - if (strlen(a.status) > 4 && (a.status[0] == '1' || a.status[0] == '6')) - a.status += 4; - client_printf(s->pcb, "%s: %s\n\n", aux.rcpt_to, a.status); - } - client_printf(s->pcb, "Below is a copy of your mail:\n\n"); - - session_socket_blockmode(sock, BM_NONBLOCK); - event_set(&s->ev, sock, EV_READ|EV_WRITE, queue_bounce_event, s); - event_add(&s->ev, &s->pcb->timeout); -} - -void -queue_bounce_event(int fd, short event, void *p) -{ - struct action *action; - struct bounce *s = p; - char *status = NULL; - - if (event & EV_TIMEOUT) { - status = "100 timeout"; - goto out; - } - - switch (client_talk(s->pcb, event & EV_WRITE)) { - case CLIENT_STOP_WRITE: - goto ro; - case CLIENT_WANT_WRITE: - goto rw; - case CLIENT_RCPT_FAIL: - status = s->pcb->reply; - break; - case CLIENT_DONE: - status = s->pcb->status; - break; - default: - fatalx("queue: bad client_talk"); - } - -out: - log_debug("%s: %d: last event", runqs[Q_BOUNCE].name, s->id); - - if (*status == '2' || *status == '5' || *status == '6') { - while ((action = SLIST_FIRST(&s->batch->actions))) { - SLIST_REMOVE_HEAD(&s->batch->actions, entry); - queue_be_action_delete(s->batch->content->id, - action->id); - queue_mem_content_unref(s->batch->content); - action_free(action); - } - } - queue_done(Q_BOUNCE, s->id); - client_close(s->pcb); - free(s); - return; - -ro: - event_set(&s->ev, fd, EV_READ, queue_bounce_event, s); - event_add(&s->ev, &s->pcb->timeout); - return; - -rw: - event_set(&s->ev, fd, EV_READ|EV_WRITE, queue_bounce_event, s); - event_add(&s->ev, &s->pcb->timeout); -} - -int -queue_detect_loop(struct incoming *s) -{ - FILE *fp; - char *buf, *lbuf; - size_t len, received; - int fd, i; - - fd = queue_be_content_open(s->content->id, 0); - if (fd < 0) - fatal("queue_detect_loop: content open error"); - fp = fdopen(fd, "r"); - if (fp == NULL) - fatal("queue_detect_loop: fdopen"); - - received = 0; - lbuf = NULL; - - while ((buf = fgetln(fp, &len))) { - free(lbuf); - lbuf = NULL; - - if (buf[len - 1] == '\n') { - buf[len - 1] = '\0'; - len--; - } else { - /* EOF without EOL, copy and add the NUL */ - if ((lbuf = malloc(len + 1)) == NULL) - fatal(NULL); - memcpy(lbuf, buf, len); - lbuf[len] = '\0'; - buf = lbuf; - } - - if (*buf == '\0') { - buf = NULL; - break; - } - - if (strncasecmp(buf, "Received:", 9) == 0) { - received++; - if (received >= MAX_HOPS_COUNT) - break; - } else if (strncasecmp(buf, "Delivered-To:", 13) == 0) { - buf += 13; - while (isspace(*buf)) - buf++; - buf[strcspn(buf, " \t")] = '\0'; - for (i = 0; i < s->nlocal; i++) - if (strcmp(s->local[i], buf) == 0) - break; - if (i < s->nlocal) - break; - } - } - free(lbuf); - fclose(fp); - - return (buf == NULL ? 0 : -1); -} - -struct incoming * -incoming_alloc(u_int64_t content_id) -{ - struct incoming *s; - u_int rq; - - s = calloc(1, sizeof *s); - if (s == NULL) - return NULL; - for (rq = 0; rq < nitems(s->batches); rq++) - SLIST_INIT(&s->batches[rq]); - - s->content = content_alloc(content_id); - if (s->content == NULL) { - free(s); - return NULL; - } + if (event_dispatch() < 0) + fatal("event_dispatch"); + queue_shutdown(); - return s; + return (0); } struct batch * -incoming_batch(struct incoming *s, char *sortkey) -{ - struct batch *batch; - u_int rq; - - if (*sortkey) { - rq = Q_RELAY; - SLIST_FOREACH(batch, &s->batches[rq], entry) - if (strcmp(batch->sortkey, sortkey) == 0) - break; - } else { - rq = Q_LOCAL; - batch = NULL; - } - - if (batch == NULL) { - batch = batch_alloc(s->content, sortkey); - if (batch == NULL) - return NULL; - SLIST_INSERT_HEAD(&s->batches[rq], batch, entry); - } - - return batch; -} - -void -incoming_schedule(struct incoming *s) -{ - struct batch *batch; - u_int rq; - - for (rq = 0; rq < nitems(s->batches); rq++) { - while ((batch = SLIST_FIRST(&s->batches[rq]))) { - SLIST_REMOVE_HEAD(&s->batches[rq], entry); - batch->retry = RETRY_NOW; - queue_schedule(rq, batch); - } - } - - queue_sleep(Q_LOCAL); - queue_sleep(Q_RELAY); -} - -struct content * -content_alloc(u_int64_t content_id) +batch_by_id(struct smtpd *env, u_int64_t id) { - struct content *content; + struct batch lookup; - content = calloc(1, sizeof *content); - if (content == NULL) - return NULL; - - content->id = content_id; - - return content; + lookup.id = id; + return SPLAY_FIND(batchtree, &env->batch_queue, &lookup); } -struct batch * -batch_alloc(struct content *content, char *sortkey) -{ - struct batch *batch; - - batch = calloc(1, sizeof *batch + strlen(sortkey)); - if (batch == NULL) - return NULL; - - SLIST_INIT(&batch->actions); - batch->content = content; - strlcpy(batch->sortkey, sortkey, strlen(sortkey) + 1); - return batch; -} - -struct action * -action_alloc(u_int64_t action_id) +void +queue_purge(char *queuepath) { - struct action *action; + char path[MAXPATHLEN]; + struct qwalk *q; - action = malloc(sizeof *action); - if (action == NULL) - return NULL; + q = qwalk_new(queuepath); - action->id = action_id; + while (qwalk(q, path)) + queue_delete_layout_message(queuepath, basename(path)); - return action; + qwalk_close(q); } void -action_free(struct action *action) +queue_submit_envelope(struct smtpd *env, struct message *message) { - free(action); + imsg_compose_event(env->sc_ievs[PROC_QUEUE], + IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, + message, sizeof(struct message)); } void -action_insert(struct action *action, struct batch *batch) +queue_commit_envelopes(struct smtpd *env, struct message *message) { - SLIST_INSERT_HEAD(&batch->actions, action, entry); - batch->content->ref++; -} - -struct action * -action_grow(struct action *action, char *aux) -{ - struct action *new; - - new = realloc(action, sizeof *new + strlen(aux)); - if (new == NULL) - return NULL; - - strlcpy(new->data, aux, strlen(aux) + 1); - - return new; + imsg_compose_event(env->sc_ievs[PROC_QUEUE], + IMSG_QUEUE_COMMIT_ENVELOPES, 0, 0, -1, + message, sizeof(struct message)); } |