diff options
author | Gilles Chehade <gilles@cvs.openbsd.org> | 2013-01-26 09:37:25 +0000 |
---|---|---|
committer | Gilles Chehade <gilles@cvs.openbsd.org> | 2013-01-26 09:37:25 +0000 |
commit | 52e93b0e61fd0a116dbb373054e2cd0ea3bfcf39 (patch) | |
tree | 41934d0fc43bfebf55ba5a199e0d699adf24aff1 /usr.sbin/smtpd/queue.c | |
parent | 3b78bd2481525635417ca0fc75396ef754c09171 (diff) |
Sync with our smtpd repo:
* first bricks of ldap and sqlite support (not finished but both working)
* new table API to replace map API, all lookups are done through tables
* improved handling of temporary errors throughout the daemon
* improved scheduler and mta logic: connection reuse, optimizes batches
* improved queue: more tolerant to admin errors, new layout, less disk-IO
* improved memory usage under high load
* SSL certs/keys isolated to lookup process to avoid facing network
* VIRTUAL support improved, fully virtual setups possible now
* runtime tracing of processes through smtpctl trace
* ssl_privsep.c sync-ed with relayd
* ssl.c no longer contains smtpd specific interfaces
* smtpd-specific ssl bits moved to ssl_smtpd.c
* update mail address in copyright
FLUSH YOUR QUEUE. FLUSH YOUR QUEUE. FLUSH YOUR QUEUE. FLUSH YOUR QUEUE.
smtpd.conf(5) simplified, it will require adaptations
ok eric@
Diffstat (limited to 'usr.sbin/smtpd/queue.c')
-rw-r--r-- | usr.sbin/smtpd/queue.c | 654 |
1 files changed, 460 insertions, 194 deletions
diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c index a0841774ea2..c47492f3613 100644 --- a/usr.sbin/smtpd/queue.c +++ b/usr.sbin/smtpd/queue.c @@ -1,7 +1,7 @@ -/* $OpenBSD: queue.c,v 1.144 2012/11/23 09:25:44 eric Exp $ */ +/* $OpenBSD: queue.c,v 1.145 2013/01/26 09:37:23 gilles Exp $ */ /* - * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> + * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org> * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> * Copyright (c) 2012 Eric Faurot <eric@openbsd.org> * @@ -41,259 +41,382 @@ #include "smtpd.h" #include "log.h" -static void queue_imsg(struct imsgev *, struct imsg *); +static void queue_imsg(struct mproc *, struct imsg *); static void queue_timeout(int, short, void *); -static void queue_bounce(struct envelope *); -static void queue_pass_to_scheduler(struct imsgev *, struct imsg *); +static void queue_bounce(struct envelope *, struct delivery_bounce *); static void queue_shutdown(void); static void queue_sig_handler(int, short, void *); +static void queue_log(const struct envelope *, const char *, const char *); + +static size_t flow_agent_hiwat = 10 * 1024 * 1024; +static size_t flow_agent_lowat = 1 * 1024 * 1024; +static size_t flow_scheduler_hiwat = 10 * 1024 * 1024; +static size_t flow_scheduler_lowat = 1 * 1024 * 1024; + +#define LIMIT_AGENT 0x01 +#define LIMIT_SCHEDULER 0x02 + +static int limit = 0; static void -queue_imsg(struct imsgev *iev, struct imsg *imsg) +queue_imsg(struct mproc *p, struct imsg *imsg) { - struct evpstate *state; + struct delivery_bounce bounce; + struct bounce_req_msg *req_bounce; + struct envelope evp; static uint64_t batch_id; - struct submit_status ss; - struct envelope *e, evp; - int fd, ret; - uint64_t id; + struct msg m; + const char *reason; + uint64_t reqid, evpid; uint32_t msgid; + time_t nexttry; + int fd, ret, v, flags; - if (iev->proc == PROC_SMTP) { - e = imsg->data; + if (p->proc == PROC_SMTP) { switch (imsg->hdr.type) { case IMSG_QUEUE_CREATE_MESSAGE: - ss.id = e->session_id; - ss.code = 250; - ss.u.msgid = 0; - ret = queue_message_create(&ss.u.msgid); + m_msg(&m, imsg); + m_get_id(&m, &reqid); + m_end(&m); + + ret = queue_message_create(&msgid); + + m_create(p, IMSG_QUEUE_CREATE_MESSAGE, 0, 0, -1, 24); + m_add_id(p, reqid); if (ret == 0) - ss.code = 421; - imsg_compose_event(iev, IMSG_QUEUE_CREATE_MESSAGE, 0, 0, - -1, &ss, sizeof ss); + m_add_int(p, 0); + else { + m_add_int(p, 1); + m_add_msgid(p, msgid); + } + m_close(p); return; case IMSG_QUEUE_REMOVE_MESSAGE: - msgid = *(uint32_t*)(imsg->data); - queue_message_incoming_delete(msgid); - imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], - IMSG_QUEUE_REMOVE_MESSAGE, 0, 0, -1, - &msgid, sizeof msgid); + m_msg(&m, imsg); + m_get_msgid(&m, &msgid); + m_end(&m); + + queue_message_delete(msgid); + + m_create(p_scheduler, IMSG_QUEUE_REMOVE_MESSAGE, + 0, 0, -1, 5); + m_add_msgid(p_scheduler, msgid); + m_close(p_scheduler); return; case IMSG_QUEUE_COMMIT_MESSAGE: - ss.id = e->session_id; - ss.code = 250; - msgid = evpid_to_msgid(e->id); - if (queue_message_commit(msgid)) { - imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], - IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, - &msgid, sizeof msgid); - } else - ss.code = 421; - - imsg_compose_event(iev, IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, - -1, &ss, sizeof ss); + m_msg(&m, imsg); + m_get_id(&m, &reqid); + m_get_msgid(&m, &msgid); + m_end(&m); + + ret = queue_message_commit(msgid); + + m_create(p, IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, 16); + m_add_id(p, reqid); + m_add_int(p, (ret == 0) ? 0 : 1); + m_close(p); + + if (ret) { + m_create(p_scheduler, IMSG_QUEUE_COMMIT_MESSAGE, + 0, 0, -1, 5); + m_add_msgid(p_scheduler, msgid); + m_close(p_scheduler); + } return; case IMSG_QUEUE_MESSAGE_FILE: - ss.id = e->session_id; - fd = queue_message_fd_rw(evpid_to_msgid(e->id)); - if (fd == -1) - ss.code = 421; - imsg_compose_event(iev, IMSG_QUEUE_MESSAGE_FILE, 0, 0, - fd, &ss, sizeof ss); + m_msg(&m, imsg); + m_get_id(&m, &reqid); + m_get_msgid(&m, &msgid); + m_end(&m); + + fd = queue_message_fd_rw(msgid); + + m_create(p, IMSG_QUEUE_MESSAGE_FILE, 0, 0, fd, 16); + m_add_id(p, reqid); + m_add_int(p, (fd == -1) ? 0 : 1); + m_close(p); return; - case IMSG_SMTP_ENQUEUE: - id = *(uint64_t*)(imsg->data); - bounce_run(id, imsg->fd); + case IMSG_SMTP_ENQUEUE_FD: + bounce_fd(imsg->fd); return; } } - if (iev->proc == PROC_LKA) { - e = imsg->data; + if (p->proc == PROC_LKA) { switch (imsg->hdr.type) { case IMSG_QUEUE_SUBMIT_ENVELOPE: - if (!queue_envelope_create(e)) { - ss.id = e->session_id; - ss.code = 421; - imsg_compose_event(env->sc_ievs[PROC_SMTP], - IMSG_QUEUE_TEMPFAIL, 0, 0, -1, &ss, - sizeof ss); - } else { - /* tell the scheduler */ - imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], - IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, e, - sizeof *e); + m_msg(&m, imsg); + m_get_id(&m, &reqid); + m_get_envelope(&m, &evp); + m_end(&m); + + if (evp.id == 0) + log_warn("warn: imsg_queue_submit_envelope: evpid=0"); + if (evpid_to_msgid(evp.id) == 0) + log_warn("warn: imsg_queue_submit_envelope: msgid=0, " + "evpid=%016"PRIx64, evp.id); + ret = queue_envelope_create(&evp); + m_create(p_smtp, IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, + 24); + m_add_id(p_smtp, reqid); + if (ret == 0) + m_add_int(p_smtp, 0); + else { + m_add_int(p_smtp, 1); + m_add_evpid(p_smtp, evp.id); + } + m_close(p_smtp); + if (ret) { + m_create(p_scheduler, + IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, + MSZ_EVP); + m_add_envelope(p_scheduler, &evp); + m_close(p_scheduler); + } return; case IMSG_QUEUE_COMMIT_ENVELOPES: - ss.id = e->session_id; - ss.code = 250; - imsg_compose_event(env->sc_ievs[PROC_SMTP], - IMSG_QUEUE_COMMIT_ENVELOPES, 0, 0, -1, &ss, - sizeof ss); + m_msg(&m, imsg); + m_get_id(&m, &reqid); + m_end(&m); + m_create(p_smtp, IMSG_QUEUE_COMMIT_ENVELOPES, 0, 0, -1, + 16); + m_add_id(p_smtp, reqid); + m_add_int(p_smtp, 1); + m_close(p_smtp); return; } } - if (iev->proc == PROC_SCHEDULER) { + if (p->proc == PROC_SCHEDULER) { switch (imsg->hdr.type) { case IMSG_QUEUE_REMOVE: - id = *(uint64_t*)(imsg->data); - if (queue_envelope_load(id, &evp) == 0) - errx(1, "cannot load evp:%016" PRIx64, id); - log_envelope(&evp, NULL, "Remove", - "Removed by administrator"); - queue_envelope_delete(&evp); + m_msg(&m, imsg); + m_get_evpid(&m, &evpid); + m_end(&m); + if (queue_envelope_load(evpid, &evp) == 0) + errx(1, "cannot load evp:%016" PRIx64, evpid); + queue_log(&evp, "Remove", "Removed by administrator"); + queue_envelope_delete(evpid); return; case IMSG_QUEUE_EXPIRE: - id = *(uint64_t*)(imsg->data); - if (queue_envelope_load(id, &evp) == 0) - errx(1, "cannot load evp:%016" PRIx64, id); + m_msg(&m, imsg); + m_get_evpid(&m, &evpid); + m_end(&m); + if (queue_envelope_load(evpid, &evp) == 0) + errx(1, "cannot load evp:%016" PRIx64, evpid); + bounce.type = B_ERROR; + bounce.delay = 0; + bounce.expire = 0; envelope_set_errormsg(&evp, "Envelope expired"); - queue_bounce(&evp); - log_envelope(&evp, NULL, "Expire", evp.errorline); - queue_envelope_delete(&evp); + queue_bounce(&evp, &bounce); + queue_log(&evp, "Expire", "Envelope expired"); + queue_envelope_delete(evpid); return; - case IMSG_MDA_SESS_NEW: - id = *(uint64_t*)(imsg->data); - if (queue_envelope_load(id, &evp) == 0) - errx(1, "cannot load evp:%016" PRIx64, id); + case IMSG_QUEUE_BOUNCE: + req_bounce = imsg->data; + evpid = req_bounce->evpid; + if (queue_envelope_load(evpid, &evp) == 0) + errx(1, "cannot load evp:%016" PRIx64, evpid); + queue_bounce(&evp, &req_bounce->bounce); + evp.lastbounce = req_bounce->timestamp; + queue_envelope_update(&evp); + return; + + case IMSG_MDA_DELIVER: + m_msg(&m, imsg); + m_get_evpid(&m, &evpid); + m_end(&m); + if (queue_envelope_load(evpid, &evp) == 0) + errx(1, "cannot load evp:%016" PRIx64, evpid); evp.lasttry = time(NULL); - imsg_compose_event(env->sc_ievs[PROC_MDA], - IMSG_MDA_SESS_NEW, 0, 0, -1, &evp, sizeof evp); + m_create(p_mda, IMSG_MDA_DELIVER, 0, 0, -1, MSZ_EVP); + m_add_envelope(p_mda, &evp); + m_close(p_mda); return; - case IMSG_SMTP_ENQUEUE: - id = *(uint64_t*)(imsg->data); - bounce_add(id); + case IMSG_BOUNCE_INJECT: + m_msg(&m, imsg); + m_get_evpid(&m, &evpid); + m_end(&m); + bounce_add(evpid); return; - case IMSG_BATCH_CREATE: + case IMSG_MTA_BATCH: batch_id = generate_uid(); - imsg_compose_event(env->sc_ievs[PROC_MTA], - IMSG_BATCH_CREATE, 0, 0, -1, - &batch_id, sizeof batch_id); + m_create(p_mta, IMSG_MTA_BATCH, 0, 0, -1, 9); + m_add_id(p_mta, batch_id); + m_close(p_mta); return; - case IMSG_BATCH_APPEND: - id = *(uint64_t*)(imsg->data); - if (queue_envelope_load(id, &evp) == 0) - errx(1, "cannot load evp:%016" PRIx64, id); + case IMSG_MTA_BATCH_ADD: + m_msg(&m, imsg); + m_get_evpid(&m, &evpid); + m_end(&m); + if (queue_envelope_load(evpid, &evp) == 0) + errx(1, "cannot load evp:%016" PRIx64, evpid); evp.lasttry = time(NULL); - evp.batch_id = batch_id; - imsg_compose_event(env->sc_ievs[PROC_MTA], - IMSG_BATCH_APPEND, 0, 0, -1, &evp, sizeof evp); + m_create(p_mta, IMSG_MTA_BATCH_ADD, 0, 0, -1, MSZ_EVP); + m_add_id(p_mta, batch_id); + m_add_envelope(p_mta, &evp); + m_close(p_mta); return; - case IMSG_BATCH_CLOSE: - imsg_compose_event(env->sc_ievs[PROC_MTA], - IMSG_BATCH_CLOSE, 0, 0, -1, - &batch_id, sizeof batch_id); + case IMSG_MTA_BATCH_END: + m_create(p_mta, IMSG_MTA_BATCH_END, 0, 0, -1, 9); + m_add_id(p_mta, batch_id); + m_close(p_mta); return; - case IMSG_SCHEDULER_ENVELOPES: + case IMSG_CTL_LIST_ENVELOPES: if (imsg->hdr.len == sizeof imsg->hdr) { - imsg_compose_event(env->sc_ievs[PROC_CONTROL], - IMSG_SCHEDULER_ENVELOPES, imsg->hdr.peerid, - 0, -1, NULL, 0); + m_forward(p_control, imsg); return; } - state = imsg->data; - if (queue_envelope_load(state->evpid, &evp) == 0) + + m_msg(&m, imsg); + m_get_evpid(&m, &evpid); + m_get_int(&m, &flags); + m_get_time(&m, &nexttry); + m_end(&m); + + if (queue_envelope_load(evpid, &evp) == 0) return; /* Envelope is gone, drop it */ + /* * XXX consistency: The envelope might already be on * its way back to the scheduler. We need to detect * this properly and report that state. */ - evp.flags |= state->flags; + evp.flags |= flags; /* In the past if running or runnable */ - evp.nexttry = state->time; - if (state->flags == DF_INFLIGHT) { + evp.nexttry = nexttry; + if (flags == EF_INFLIGHT) { /* * Not exactly correct but pretty close: The * value is not recorded on the envelope unless * a tempfail occurs. */ - evp.lasttry = state->time; + evp.lasttry = nexttry; } - imsg_compose_event(env->sc_ievs[PROC_CONTROL], - IMSG_SCHEDULER_ENVELOPES, imsg->hdr.peerid, 0, -1, - &evp, sizeof evp); + m_compose(p_control, IMSG_CTL_LIST_ENVELOPES, + imsg->hdr.peerid, 0, -1, &evp, sizeof evp); return; } } - if (iev->proc == PROC_MTA || iev->proc == PROC_MDA) { + if (p->proc == PROC_MTA || p->proc == PROC_MDA) { switch (imsg->hdr.type) { case IMSG_QUEUE_MESSAGE_FD: - fd = queue_message_fd_r(imsg->hdr.peerid); - imsg_compose_event(iev, IMSG_QUEUE_MESSAGE_FD, 0, 0, - fd, imsg->data, imsg->hdr.len - sizeof imsg->hdr); + m_msg(&m, imsg); + m_get_id(&m, &reqid); + m_get_msgid(&m, &msgid); + m_end(&m); + fd = queue_message_fd_r(msgid); + m_create(p, IMSG_QUEUE_MESSAGE_FD, 0, 0, fd, 25); + m_add_id(p, reqid); + m_close(p); return; - case IMSG_QUEUE_DELIVERY_OK: - e = imsg->data; - queue_envelope_delete(e); - imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], - IMSG_QUEUE_DELIVERY_OK, 0, 0, -1, &e->id, - sizeof e->id); + case IMSG_DELIVERY_OK: + m_msg(&m, imsg); + m_get_evpid(&m, &evpid); + m_end(&m); + queue_envelope_delete(evpid); + m_create(p_scheduler, IMSG_DELIVERY_OK, 0, 0, -1, 9); + m_add_evpid(p_scheduler, evpid); + m_close(p_scheduler); return; - case IMSG_QUEUE_DELIVERY_TEMPFAIL: - e = imsg->data; - e->retry++; - queue_envelope_update(e); - imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], - IMSG_QUEUE_DELIVERY_TEMPFAIL, 0, 0, -1, e, - sizeof *e); + case IMSG_DELIVERY_TEMPFAIL: + m_msg(&m, imsg); + m_get_evpid(&m, &evpid); + m_get_string(&m, &reason); + m_end(&m); + if (queue_envelope_load(evpid, &evp) == 0) + errx(1, "cannot load evp:%016" PRIx64, evpid); + envelope_set_errormsg(&evp, "%s", reason); + evp.retry++; + queue_envelope_update(&evp); + m_create(p_scheduler, IMSG_DELIVERY_TEMPFAIL, 0, 0, -1, + MSZ_EVP); + m_add_envelope(p_scheduler, &evp); + m_close(p_scheduler); return; - case IMSG_QUEUE_DELIVERY_PERMFAIL: - e = imsg->data; - queue_bounce(e); - queue_envelope_delete(e); - imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], - IMSG_QUEUE_DELIVERY_PERMFAIL, 0, 0, -1, &e->id, - sizeof e->id); + case IMSG_DELIVERY_PERMFAIL: + m_msg(&m, imsg); + m_get_evpid(&m, &evpid); + m_get_string(&m, &reason); + m_end(&m); + if (queue_envelope_load(evpid, &evp) == 0) + errx(1, "cannot load evp:%016" PRIx64, evpid); + bounce.type = B_ERROR; + bounce.delay = 0; + bounce.expire = 0; + envelope_set_errormsg(&evp, "%s", reason); + queue_bounce(&evp, &bounce); + queue_envelope_delete(evpid); + m_create(p_scheduler, IMSG_DELIVERY_PERMFAIL, 0, 0, -1, + 9); + m_add_evpid(p_scheduler, evpid); + m_close(p_scheduler); return; - case IMSG_QUEUE_DELIVERY_LOOP: - e = imsg->data; - queue_bounce(e); - queue_envelope_delete(e); - imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], - IMSG_QUEUE_DELIVERY_LOOP, 0, 0, -1, &e->id, - sizeof e->id); + case IMSG_DELIVERY_LOOP: + m_msg(&m, imsg); + m_get_evpid(&m, &evpid); + m_end(&m); + if (queue_envelope_load(evpid, &evp) == 0) + errx(1, "cannot load evp:%016" PRIx64, evpid); + envelope_set_errormsg(&evp, "%s", "Loop detected"); + bounce.type = B_ERROR; + bounce.delay = 0; + bounce.expire = 0; + queue_bounce(&evp, &bounce); + queue_envelope_delete(evp.id); + m_create(p_scheduler, IMSG_DELIVERY_LOOP, 0, 0, -1, 9); + m_add_evpid(p_scheduler, evp.id); + m_close(p_scheduler); return; } } - if (iev->proc == PROC_CONTROL) { + if (p->proc == PROC_CONTROL) { switch (imsg->hdr.type) { - case IMSG_QUEUE_PAUSE_MDA: - case IMSG_QUEUE_PAUSE_MTA: - case IMSG_QUEUE_RESUME_MDA: - case IMSG_QUEUE_RESUME_MTA: + case IMSG_CTL_PAUSE_MDA: + case IMSG_CTL_PAUSE_MTA: + case IMSG_CTL_RESUME_MDA: + case IMSG_CTL_RESUME_MTA: case IMSG_QUEUE_REMOVE: - queue_pass_to_scheduler(iev, imsg); + m_forward(p_scheduler, imsg); return; } } - if (iev->proc == PROC_PARENT) { + if (p->proc == PROC_PARENT) { switch (imsg->hdr.type) { case IMSG_CTL_VERBOSE: - log_verbose(*(int *)imsg->data); - queue_pass_to_scheduler(iev, imsg); + m_msg(&m, imsg); + m_get_int(&m, &v); + m_end(&m); + log_verbose(v); + m_forward(p_scheduler, imsg); + return; + + case IMSG_CTL_PROFILE: + m_msg(&m, imsg); + m_get_int(&m, &v); + m_end(&m); + profiling = v; return; } } @@ -302,26 +425,23 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg) } static void -queue_pass_to_scheduler(struct imsgev *iev, struct imsg *imsg) -{ - imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], imsg->hdr.type, - iev->proc, imsg->hdr.pid, imsg->fd, imsg->data, - imsg->hdr.len - sizeof imsg->hdr); -} - -static void -queue_bounce(struct envelope *e) +queue_bounce(struct envelope *e, struct delivery_bounce *d) { struct envelope b; - uint32_t msgid; b = *e; b.type = D_BOUNCE; + b.agent.bounce = *d; b.retry = 0; b.lasttry = 0; b.creation = time(NULL); b.expire = 3600 * 24 * 7; + if (b.id == 0) + log_warn("warn: queue_bounce: evpid=0"); + if (evpid_to_msgid(b.id) == 0) + log_warn("warn: queue_bounce: msgid=0, evpid=%016"PRIx64, + b.id); if (e->type == D_BOUNCE) { log_warnx("warn: queue: double bounce!"); } else if (e->sender.user[0] == '\0') { @@ -331,11 +451,16 @@ queue_bounce(struct envelope *e) } else { log_debug("debug: queue: bouncing evp:%016" PRIx64 " as evp:%016" PRIx64, e->id, b.id); - imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], - IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, &b, sizeof b); - msgid = evpid_to_msgid(b.id); - imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], - IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &msgid, sizeof msgid); + + m_create(p_scheduler, IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, + MSZ_EVP); + m_add_envelope(p_scheduler, &b); + m_close(p_scheduler); + + m_create(p_scheduler, IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, 5); + m_add_msgid(p_scheduler, evpid_to_msgid(b.id)); + m_close(p_scheduler); + stat_increment("queue.bounce", 1); } } @@ -360,6 +485,32 @@ queue_shutdown(void) _exit(0); } +static void +queue_set_sndbuf(struct mproc *p, int sz) +{ + int osz; + socklen_t sl; + + sl = sizeof(osz); + if (getsockopt(p->imsgbuf.fd, SOL_SOCKET, SO_SNDBUF, &osz, &sl) == -1) { + log_warn("warn: getsockopt"); + return; + } + if (osz == sz) + return; + + if (setsockopt(p->imsgbuf.fd, SOL_SOCKET, SO_SNDBUF, &sz, sl) == -1) { + log_warn("warn: setsockopt"); + return; + } + if (getsockopt(p->imsgbuf.fd, SOL_SOCKET, SO_SNDBUF, &sz, &sl) == -1) { + log_warn("warn: getsockopt"); + return; + } + log_debug("debug: queue: adjusted output buffer size for %s: %i -> %i", + p->name, osz, sz); +} + pid_t queue(void) { @@ -370,36 +521,29 @@ queue(void) struct event ev_sigint; struct event ev_sigterm; - struct peer peers[] = { - { PROC_PARENT, imsg_dispatch }, - { PROC_CONTROL, imsg_dispatch }, - { PROC_SMTP, imsg_dispatch }, - { PROC_MDA, imsg_dispatch }, - { PROC_MTA, imsg_dispatch }, - { PROC_LKA, imsg_dispatch }, - { PROC_SCHEDULER, imsg_dispatch } - }; - switch (pid = fork()) { case -1: fatal("queue: cannot fork"); case 0: + env->sc_pid = getpid(); break; default: return (pid); } purge_config(PURGE_EVERYTHING); + if (env->sc_pwqueue) { + free(env->sc_pw); + env->sc_pw = env->sc_pwqueue; + } pw = env->sc_pw; - if (chroot(PATH_SPOOL) == -1) fatal("queue: chroot"); if (chdir("/") == -1) fatal("queue: chdir(\"/\")"); - smtpd_process = PROC_QUEUE; - setproctitle("%s", env->sc_title[smtpd_process]); + config_process(PROC_QUEUE); if (setgroups(1, &pw->pw_gid) || setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) || @@ -418,8 +562,18 @@ queue(void) fdlimit(1.0); - config_pipes(peers, nitems(peers)); - config_peers(peers, nitems(peers)); + config_peer(PROC_PARENT); + config_peer(PROC_CONTROL); + config_peer(PROC_SMTP); + config_peer(PROC_MDA); + config_peer(PROC_MTA); + config_peer(PROC_LKA); + config_peer(PROC_SCHEDULER); + config_done(); + + queue_set_sndbuf(p_scheduler, 65536); + queue_set_sndbuf(p_mta, 65536); + queue_set_sndbuf(p_mda, 65536); /* setup queue loading task */ evtimer_set(&ev_qload, queue_timeout, &ev_qload); @@ -445,25 +599,137 @@ queue_timeout(int fd, short event, void *p) r = queue_envelope_walk(&evp); if (r == -1) { - if (msgid) - imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], - IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &msgid, - sizeof msgid); + if (msgid) { + m_create(p_scheduler, IMSG_QUEUE_COMMIT_MESSAGE, + 0, 0, -1, 5); + m_add_msgid(p_scheduler, msgid); + m_close(p_scheduler); + } log_debug("debug: queue: done loading queue into scheduler"); return; } if (r) { - if (msgid && evpid_to_msgid(evp.id) != msgid) - imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], - IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &msgid, - sizeof msgid); + if (msgid && evpid_to_msgid(evp.id) != msgid) { + m_create(p_scheduler, IMSG_QUEUE_COMMIT_MESSAGE, + 0, 0, -1, 5); + m_add_msgid(p_scheduler, msgid); + m_close(p_scheduler); + } msgid = evpid_to_msgid(evp.id); - imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], - IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, &evp, sizeof evp); + m_create(p_scheduler, IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, + MSZ_EVP); + m_add_envelope(p_scheduler, &evp); + m_close(p_scheduler); } tv.tv_sec = 0; tv.tv_usec = 10; evtimer_add(ev, &tv); } + +void +queue_ok(uint64_t evpid) +{ + m_create(p_queue, IMSG_DELIVERY_OK, 0, 0, -1, sizeof(evpid) + 1); + m_add_evpid(p_queue, evpid); + m_close(p_queue); +} + +void +queue_tempfail(uint64_t evpid, const char *reason) +{ + m_create(p_queue, IMSG_DELIVERY_TEMPFAIL, 0, 0, -1, + sizeof(evpid) + strlen(reason) + 2); + m_add_evpid(p_queue, evpid); + m_add_string(p_queue, reason); + m_close(p_queue); +} + +void +queue_permfail(uint64_t evpid, const char *reason) +{ + m_create(p_queue, IMSG_DELIVERY_PERMFAIL, 0, 0, -1, + sizeof(evpid) + strlen(reason) + 2); + m_add_evpid(p_queue, evpid); + m_add_string(p_queue, reason); + m_close(p_queue); +} + +void +queue_loop(uint64_t evpid) +{ + m_create(p_queue, IMSG_DELIVERY_LOOP, 0, 0, -1, sizeof(evpid) + 1); + m_add_evpid(p_queue, evpid); + m_close(p_queue); +} + +static void +queue_log(const struct envelope *e, const char *prefix, const char *status) +{ + char rcpt[MAX_LINE_SIZE]; + + rcpt[0] = '\0'; + if (strcmp(e->rcpt.user, e->dest.user) || + strcmp(e->rcpt.domain, e->dest.domain)) + snprintf(rcpt, sizeof rcpt, "rcpt=<%s@%s>, ", + e->rcpt.user, e->rcpt.domain); + + log_info("%s: %s for %016" PRIx64 ": from=<%s@%s>, to=<%s@%s>, " + "%sdelay=%s, stat=%s", + e->type == D_MDA ? "delivery" : "relay", + prefix, + e->id, e->sender.user, e->sender.domain, + e->dest.user, e->dest.domain, + rcpt, + duration_to_text(time(NULL) - e->creation), + status); +} + +void +queue_flow_control(void) +{ + size_t bufsz; + int oldlimit = limit; + int set, unset; + + bufsz = p_mda->bytes_queued + p_mta->bytes_queued; + if (bufsz <= flow_agent_lowat) + limit &= ~LIMIT_AGENT; + else if (bufsz > flow_agent_hiwat) + limit |= LIMIT_AGENT; + + if (p_scheduler->bytes_queued <= flow_scheduler_lowat) + limit &= ~LIMIT_SCHEDULER; + else if (p_scheduler->bytes_queued > flow_scheduler_hiwat) + limit |= LIMIT_SCHEDULER; + + set = limit & (limit ^ oldlimit); + unset = oldlimit & (limit ^ oldlimit); + + if (set & LIMIT_SCHEDULER) { + log_warnx("warn: queue: Hiwat reached on scheduler buffer: " + "suspending transfer, delivery and lookup input"); + mproc_disable(p_mta); + mproc_disable(p_mda); + mproc_disable(p_lka); + } + else if (unset & LIMIT_SCHEDULER) { + log_warnx("warn: queue: Down to lowat on scheduler buffer: " + "resuming transfer, delivery and lookup input"); + mproc_enable(p_mta); + mproc_enable(p_mda); + mproc_enable(p_lka); + } + + if (set & LIMIT_AGENT) { + log_warnx("warn: queue: Hiwat reached on transfer and delivery " + "buffers: suspending scheduler input"); + mproc_disable(p_scheduler); + } + else if (unset & LIMIT_AGENT) { + log_warnx("warn: queue: Down to lowat on transfer and delivery " + "buffers: resuming scheduler input"); + mproc_enable(p_scheduler); + } +} |