summaryrefslogtreecommitdiff
path: root/usr.sbin/smtpd/queue.c
diff options
context:
space:
mode:
authorGilles Chehade <gilles@cvs.openbsd.org>2013-01-26 09:37:25 +0000
committerGilles Chehade <gilles@cvs.openbsd.org>2013-01-26 09:37:25 +0000
commit52e93b0e61fd0a116dbb373054e2cd0ea3bfcf39 (patch)
tree41934d0fc43bfebf55ba5a199e0d699adf24aff1 /usr.sbin/smtpd/queue.c
parent3b78bd2481525635417ca0fc75396ef754c09171 (diff)
Sync with our smtpd repo:
* first bricks of ldap and sqlite support (not finished but both working) * new table API to replace map API, all lookups are done through tables * improved handling of temporary errors throughout the daemon * improved scheduler and mta logic: connection reuse, optimizes batches * improved queue: more tolerant to admin errors, new layout, less disk-IO * improved memory usage under high load * SSL certs/keys isolated to lookup process to avoid facing network * VIRTUAL support improved, fully virtual setups possible now * runtime tracing of processes through smtpctl trace * ssl_privsep.c sync-ed with relayd * ssl.c no longer contains smtpd specific interfaces * smtpd-specific ssl bits moved to ssl_smtpd.c * update mail address in copyright FLUSH YOUR QUEUE. FLUSH YOUR QUEUE. FLUSH YOUR QUEUE. FLUSH YOUR QUEUE. smtpd.conf(5) simplified, it will require adaptations ok eric@
Diffstat (limited to 'usr.sbin/smtpd/queue.c')
-rw-r--r--usr.sbin/smtpd/queue.c654
1 files changed, 460 insertions, 194 deletions
diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c
index a0841774ea2..c47492f3613 100644
--- a/usr.sbin/smtpd/queue.c
+++ b/usr.sbin/smtpd/queue.c
@@ -1,7 +1,7 @@
-/* $OpenBSD: queue.c,v 1.144 2012/11/23 09:25:44 eric Exp $ */
+/* $OpenBSD: queue.c,v 1.145 2013/01/26 09:37:23 gilles Exp $ */
/*
- * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
+ * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org>
* Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
* Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
*
@@ -41,259 +41,382 @@
#include "smtpd.h"
#include "log.h"
-static void queue_imsg(struct imsgev *, struct imsg *);
+static void queue_imsg(struct mproc *, struct imsg *);
static void queue_timeout(int, short, void *);
-static void queue_bounce(struct envelope *);
-static void queue_pass_to_scheduler(struct imsgev *, struct imsg *);
+static void queue_bounce(struct envelope *, struct delivery_bounce *);
static void queue_shutdown(void);
static void queue_sig_handler(int, short, void *);
+static void queue_log(const struct envelope *, const char *, const char *);
+
+static size_t flow_agent_hiwat = 10 * 1024 * 1024;
+static size_t flow_agent_lowat = 1 * 1024 * 1024;
+static size_t flow_scheduler_hiwat = 10 * 1024 * 1024;
+static size_t flow_scheduler_lowat = 1 * 1024 * 1024;
+
+#define LIMIT_AGENT 0x01
+#define LIMIT_SCHEDULER 0x02
+
+static int limit = 0;
static void
-queue_imsg(struct imsgev *iev, struct imsg *imsg)
+queue_imsg(struct mproc *p, struct imsg *imsg)
{
- struct evpstate *state;
+ struct delivery_bounce bounce;
+ struct bounce_req_msg *req_bounce;
+ struct envelope evp;
static uint64_t batch_id;
- struct submit_status ss;
- struct envelope *e, evp;
- int fd, ret;
- uint64_t id;
+ struct msg m;
+ const char *reason;
+ uint64_t reqid, evpid;
uint32_t msgid;
+ time_t nexttry;
+ int fd, ret, v, flags;
- if (iev->proc == PROC_SMTP) {
- e = imsg->data;
+ if (p->proc == PROC_SMTP) {
switch (imsg->hdr.type) {
case IMSG_QUEUE_CREATE_MESSAGE:
- ss.id = e->session_id;
- ss.code = 250;
- ss.u.msgid = 0;
- ret = queue_message_create(&ss.u.msgid);
+ m_msg(&m, imsg);
+ m_get_id(&m, &reqid);
+ m_end(&m);
+
+ ret = queue_message_create(&msgid);
+
+ m_create(p, IMSG_QUEUE_CREATE_MESSAGE, 0, 0, -1, 24);
+ m_add_id(p, reqid);
if (ret == 0)
- ss.code = 421;
- imsg_compose_event(iev, IMSG_QUEUE_CREATE_MESSAGE, 0, 0,
- -1, &ss, sizeof ss);
+ m_add_int(p, 0);
+ else {
+ m_add_int(p, 1);
+ m_add_msgid(p, msgid);
+ }
+ m_close(p);
return;
case IMSG_QUEUE_REMOVE_MESSAGE:
- msgid = *(uint32_t*)(imsg->data);
- queue_message_incoming_delete(msgid);
- imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
- IMSG_QUEUE_REMOVE_MESSAGE, 0, 0, -1,
- &msgid, sizeof msgid);
+ m_msg(&m, imsg);
+ m_get_msgid(&m, &msgid);
+ m_end(&m);
+
+ queue_message_delete(msgid);
+
+ m_create(p_scheduler, IMSG_QUEUE_REMOVE_MESSAGE,
+ 0, 0, -1, 5);
+ m_add_msgid(p_scheduler, msgid);
+ m_close(p_scheduler);
return;
case IMSG_QUEUE_COMMIT_MESSAGE:
- ss.id = e->session_id;
- ss.code = 250;
- msgid = evpid_to_msgid(e->id);
- if (queue_message_commit(msgid)) {
- imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
- IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1,
- &msgid, sizeof msgid);
- } else
- ss.code = 421;
-
- imsg_compose_event(iev, IMSG_QUEUE_COMMIT_MESSAGE, 0, 0,
- -1, &ss, sizeof ss);
+ m_msg(&m, imsg);
+ m_get_id(&m, &reqid);
+ m_get_msgid(&m, &msgid);
+ m_end(&m);
+
+ ret = queue_message_commit(msgid);
+
+ m_create(p, IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, 16);
+ m_add_id(p, reqid);
+ m_add_int(p, (ret == 0) ? 0 : 1);
+ m_close(p);
+
+ if (ret) {
+ m_create(p_scheduler, IMSG_QUEUE_COMMIT_MESSAGE,
+ 0, 0, -1, 5);
+ m_add_msgid(p_scheduler, msgid);
+ m_close(p_scheduler);
+ }
return;
case IMSG_QUEUE_MESSAGE_FILE:
- ss.id = e->session_id;
- fd = queue_message_fd_rw(evpid_to_msgid(e->id));
- if (fd == -1)
- ss.code = 421;
- imsg_compose_event(iev, IMSG_QUEUE_MESSAGE_FILE, 0, 0,
- fd, &ss, sizeof ss);
+ m_msg(&m, imsg);
+ m_get_id(&m, &reqid);
+ m_get_msgid(&m, &msgid);
+ m_end(&m);
+
+ fd = queue_message_fd_rw(msgid);
+
+ m_create(p, IMSG_QUEUE_MESSAGE_FILE, 0, 0, fd, 16);
+ m_add_id(p, reqid);
+ m_add_int(p, (fd == -1) ? 0 : 1);
+ m_close(p);
return;
- case IMSG_SMTP_ENQUEUE:
- id = *(uint64_t*)(imsg->data);
- bounce_run(id, imsg->fd);
+ case IMSG_SMTP_ENQUEUE_FD:
+ bounce_fd(imsg->fd);
return;
}
}
- if (iev->proc == PROC_LKA) {
- e = imsg->data;
+ if (p->proc == PROC_LKA) {
switch (imsg->hdr.type) {
case IMSG_QUEUE_SUBMIT_ENVELOPE:
- if (!queue_envelope_create(e)) {
- ss.id = e->session_id;
- ss.code = 421;
- imsg_compose_event(env->sc_ievs[PROC_SMTP],
- IMSG_QUEUE_TEMPFAIL, 0, 0, -1, &ss,
- sizeof ss);
- } else {
- /* tell the scheduler */
- imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
- IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, e,
- sizeof *e);
+ m_msg(&m, imsg);
+ m_get_id(&m, &reqid);
+ m_get_envelope(&m, &evp);
+ m_end(&m);
+
+ if (evp.id == 0)
+ log_warn("warn: imsg_queue_submit_envelope: evpid=0");
+ if (evpid_to_msgid(evp.id) == 0)
+ log_warn("warn: imsg_queue_submit_envelope: msgid=0, "
+ "evpid=%016"PRIx64, evp.id);
+ ret = queue_envelope_create(&evp);
+ m_create(p_smtp, IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1,
+ 24);
+ m_add_id(p_smtp, reqid);
+ if (ret == 0)
+ m_add_int(p_smtp, 0);
+ else {
+ m_add_int(p_smtp, 1);
+ m_add_evpid(p_smtp, evp.id);
+ }
+ m_close(p_smtp);
+ if (ret) {
+ m_create(p_scheduler,
+ IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1,
+ MSZ_EVP);
+ m_add_envelope(p_scheduler, &evp);
+ m_close(p_scheduler);
+
}
return;
case IMSG_QUEUE_COMMIT_ENVELOPES:
- ss.id = e->session_id;
- ss.code = 250;
- imsg_compose_event(env->sc_ievs[PROC_SMTP],
- IMSG_QUEUE_COMMIT_ENVELOPES, 0, 0, -1, &ss,
- sizeof ss);
+ m_msg(&m, imsg);
+ m_get_id(&m, &reqid);
+ m_end(&m);
+ m_create(p_smtp, IMSG_QUEUE_COMMIT_ENVELOPES, 0, 0, -1,
+ 16);
+ m_add_id(p_smtp, reqid);
+ m_add_int(p_smtp, 1);
+ m_close(p_smtp);
return;
}
}
- if (iev->proc == PROC_SCHEDULER) {
+ if (p->proc == PROC_SCHEDULER) {
switch (imsg->hdr.type) {
case IMSG_QUEUE_REMOVE:
- id = *(uint64_t*)(imsg->data);
- if (queue_envelope_load(id, &evp) == 0)
- errx(1, "cannot load evp:%016" PRIx64, id);
- log_envelope(&evp, NULL, "Remove",
- "Removed by administrator");
- queue_envelope_delete(&evp);
+ m_msg(&m, imsg);
+ m_get_evpid(&m, &evpid);
+ m_end(&m);
+ if (queue_envelope_load(evpid, &evp) == 0)
+ errx(1, "cannot load evp:%016" PRIx64, evpid);
+ queue_log(&evp, "Remove", "Removed by administrator");
+ queue_envelope_delete(evpid);
return;
case IMSG_QUEUE_EXPIRE:
- id = *(uint64_t*)(imsg->data);
- if (queue_envelope_load(id, &evp) == 0)
- errx(1, "cannot load evp:%016" PRIx64, id);
+ m_msg(&m, imsg);
+ m_get_evpid(&m, &evpid);
+ m_end(&m);
+ if (queue_envelope_load(evpid, &evp) == 0)
+ errx(1, "cannot load evp:%016" PRIx64, evpid);
+ bounce.type = B_ERROR;
+ bounce.delay = 0;
+ bounce.expire = 0;
envelope_set_errormsg(&evp, "Envelope expired");
- queue_bounce(&evp);
- log_envelope(&evp, NULL, "Expire", evp.errorline);
- queue_envelope_delete(&evp);
+ queue_bounce(&evp, &bounce);
+ queue_log(&evp, "Expire", "Envelope expired");
+ queue_envelope_delete(evpid);
return;
- case IMSG_MDA_SESS_NEW:
- id = *(uint64_t*)(imsg->data);
- if (queue_envelope_load(id, &evp) == 0)
- errx(1, "cannot load evp:%016" PRIx64, id);
+ case IMSG_QUEUE_BOUNCE:
+ req_bounce = imsg->data;
+ evpid = req_bounce->evpid;
+ if (queue_envelope_load(evpid, &evp) == 0)
+ errx(1, "cannot load evp:%016" PRIx64, evpid);
+ queue_bounce(&evp, &req_bounce->bounce);
+ evp.lastbounce = req_bounce->timestamp;
+ queue_envelope_update(&evp);
+ return;
+
+ case IMSG_MDA_DELIVER:
+ m_msg(&m, imsg);
+ m_get_evpid(&m, &evpid);
+ m_end(&m);
+ if (queue_envelope_load(evpid, &evp) == 0)
+ errx(1, "cannot load evp:%016" PRIx64, evpid);
evp.lasttry = time(NULL);
- imsg_compose_event(env->sc_ievs[PROC_MDA],
- IMSG_MDA_SESS_NEW, 0, 0, -1, &evp, sizeof evp);
+ m_create(p_mda, IMSG_MDA_DELIVER, 0, 0, -1, MSZ_EVP);
+ m_add_envelope(p_mda, &evp);
+ m_close(p_mda);
return;
- case IMSG_SMTP_ENQUEUE:
- id = *(uint64_t*)(imsg->data);
- bounce_add(id);
+ case IMSG_BOUNCE_INJECT:
+ m_msg(&m, imsg);
+ m_get_evpid(&m, &evpid);
+ m_end(&m);
+ bounce_add(evpid);
return;
- case IMSG_BATCH_CREATE:
+ case IMSG_MTA_BATCH:
batch_id = generate_uid();
- imsg_compose_event(env->sc_ievs[PROC_MTA],
- IMSG_BATCH_CREATE, 0, 0, -1,
- &batch_id, sizeof batch_id);
+ m_create(p_mta, IMSG_MTA_BATCH, 0, 0, -1, 9);
+ m_add_id(p_mta, batch_id);
+ m_close(p_mta);
return;
- case IMSG_BATCH_APPEND:
- id = *(uint64_t*)(imsg->data);
- if (queue_envelope_load(id, &evp) == 0)
- errx(1, "cannot load evp:%016" PRIx64, id);
+ case IMSG_MTA_BATCH_ADD:
+ m_msg(&m, imsg);
+ m_get_evpid(&m, &evpid);
+ m_end(&m);
+ if (queue_envelope_load(evpid, &evp) == 0)
+ errx(1, "cannot load evp:%016" PRIx64, evpid);
evp.lasttry = time(NULL);
- evp.batch_id = batch_id;
- imsg_compose_event(env->sc_ievs[PROC_MTA],
- IMSG_BATCH_APPEND, 0, 0, -1, &evp, sizeof evp);
+ m_create(p_mta, IMSG_MTA_BATCH_ADD, 0, 0, -1, MSZ_EVP);
+ m_add_id(p_mta, batch_id);
+ m_add_envelope(p_mta, &evp);
+ m_close(p_mta);
return;
- case IMSG_BATCH_CLOSE:
- imsg_compose_event(env->sc_ievs[PROC_MTA],
- IMSG_BATCH_CLOSE, 0, 0, -1,
- &batch_id, sizeof batch_id);
+ case IMSG_MTA_BATCH_END:
+ m_create(p_mta, IMSG_MTA_BATCH_END, 0, 0, -1, 9);
+ m_add_id(p_mta, batch_id);
+ m_close(p_mta);
return;
- case IMSG_SCHEDULER_ENVELOPES:
+ case IMSG_CTL_LIST_ENVELOPES:
if (imsg->hdr.len == sizeof imsg->hdr) {
- imsg_compose_event(env->sc_ievs[PROC_CONTROL],
- IMSG_SCHEDULER_ENVELOPES, imsg->hdr.peerid,
- 0, -1, NULL, 0);
+ m_forward(p_control, imsg);
return;
}
- state = imsg->data;
- if (queue_envelope_load(state->evpid, &evp) == 0)
+
+ m_msg(&m, imsg);
+ m_get_evpid(&m, &evpid);
+ m_get_int(&m, &flags);
+ m_get_time(&m, &nexttry);
+ m_end(&m);
+
+ if (queue_envelope_load(evpid, &evp) == 0)
return; /* Envelope is gone, drop it */
+
/*
* XXX consistency: The envelope might already be on
* its way back to the scheduler. We need to detect
* this properly and report that state.
*/
- evp.flags |= state->flags;
+ evp.flags |= flags;
/* In the past if running or runnable */
- evp.nexttry = state->time;
- if (state->flags == DF_INFLIGHT) {
+ evp.nexttry = nexttry;
+ if (flags == EF_INFLIGHT) {
/*
* Not exactly correct but pretty close: The
* value is not recorded on the envelope unless
* a tempfail occurs.
*/
- evp.lasttry = state->time;
+ evp.lasttry = nexttry;
}
- imsg_compose_event(env->sc_ievs[PROC_CONTROL],
- IMSG_SCHEDULER_ENVELOPES, imsg->hdr.peerid, 0, -1,
- &evp, sizeof evp);
+ m_compose(p_control, IMSG_CTL_LIST_ENVELOPES,
+ imsg->hdr.peerid, 0, -1, &evp, sizeof evp);
return;
}
}
- if (iev->proc == PROC_MTA || iev->proc == PROC_MDA) {
+ if (p->proc == PROC_MTA || p->proc == PROC_MDA) {
switch (imsg->hdr.type) {
case IMSG_QUEUE_MESSAGE_FD:
- fd = queue_message_fd_r(imsg->hdr.peerid);
- imsg_compose_event(iev, IMSG_QUEUE_MESSAGE_FD, 0, 0,
- fd, imsg->data, imsg->hdr.len - sizeof imsg->hdr);
+ m_msg(&m, imsg);
+ m_get_id(&m, &reqid);
+ m_get_msgid(&m, &msgid);
+ m_end(&m);
+ fd = queue_message_fd_r(msgid);
+ m_create(p, IMSG_QUEUE_MESSAGE_FD, 0, 0, fd, 25);
+ m_add_id(p, reqid);
+ m_close(p);
return;
- case IMSG_QUEUE_DELIVERY_OK:
- e = imsg->data;
- queue_envelope_delete(e);
- imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
- IMSG_QUEUE_DELIVERY_OK, 0, 0, -1, &e->id,
- sizeof e->id);
+ case IMSG_DELIVERY_OK:
+ m_msg(&m, imsg);
+ m_get_evpid(&m, &evpid);
+ m_end(&m);
+ queue_envelope_delete(evpid);
+ m_create(p_scheduler, IMSG_DELIVERY_OK, 0, 0, -1, 9);
+ m_add_evpid(p_scheduler, evpid);
+ m_close(p_scheduler);
return;
- case IMSG_QUEUE_DELIVERY_TEMPFAIL:
- e = imsg->data;
- e->retry++;
- queue_envelope_update(e);
- imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
- IMSG_QUEUE_DELIVERY_TEMPFAIL, 0, 0, -1, e,
- sizeof *e);
+ case IMSG_DELIVERY_TEMPFAIL:
+ m_msg(&m, imsg);
+ m_get_evpid(&m, &evpid);
+ m_get_string(&m, &reason);
+ m_end(&m);
+ if (queue_envelope_load(evpid, &evp) == 0)
+ errx(1, "cannot load evp:%016" PRIx64, evpid);
+ envelope_set_errormsg(&evp, "%s", reason);
+ evp.retry++;
+ queue_envelope_update(&evp);
+ m_create(p_scheduler, IMSG_DELIVERY_TEMPFAIL, 0, 0, -1,
+ MSZ_EVP);
+ m_add_envelope(p_scheduler, &evp);
+ m_close(p_scheduler);
return;
- case IMSG_QUEUE_DELIVERY_PERMFAIL:
- e = imsg->data;
- queue_bounce(e);
- queue_envelope_delete(e);
- imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
- IMSG_QUEUE_DELIVERY_PERMFAIL, 0, 0, -1, &e->id,
- sizeof e->id);
+ case IMSG_DELIVERY_PERMFAIL:
+ m_msg(&m, imsg);
+ m_get_evpid(&m, &evpid);
+ m_get_string(&m, &reason);
+ m_end(&m);
+ if (queue_envelope_load(evpid, &evp) == 0)
+ errx(1, "cannot load evp:%016" PRIx64, evpid);
+ bounce.type = B_ERROR;
+ bounce.delay = 0;
+ bounce.expire = 0;
+ envelope_set_errormsg(&evp, "%s", reason);
+ queue_bounce(&evp, &bounce);
+ queue_envelope_delete(evpid);
+ m_create(p_scheduler, IMSG_DELIVERY_PERMFAIL, 0, 0, -1,
+ 9);
+ m_add_evpid(p_scheduler, evpid);
+ m_close(p_scheduler);
return;
- case IMSG_QUEUE_DELIVERY_LOOP:
- e = imsg->data;
- queue_bounce(e);
- queue_envelope_delete(e);
- imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
- IMSG_QUEUE_DELIVERY_LOOP, 0, 0, -1, &e->id,
- sizeof e->id);
+ case IMSG_DELIVERY_LOOP:
+ m_msg(&m, imsg);
+ m_get_evpid(&m, &evpid);
+ m_end(&m);
+ if (queue_envelope_load(evpid, &evp) == 0)
+ errx(1, "cannot load evp:%016" PRIx64, evpid);
+ envelope_set_errormsg(&evp, "%s", "Loop detected");
+ bounce.type = B_ERROR;
+ bounce.delay = 0;
+ bounce.expire = 0;
+ queue_bounce(&evp, &bounce);
+ queue_envelope_delete(evp.id);
+ m_create(p_scheduler, IMSG_DELIVERY_LOOP, 0, 0, -1, 9);
+ m_add_evpid(p_scheduler, evp.id);
+ m_close(p_scheduler);
return;
}
}
- if (iev->proc == PROC_CONTROL) {
+ if (p->proc == PROC_CONTROL) {
switch (imsg->hdr.type) {
- case IMSG_QUEUE_PAUSE_MDA:
- case IMSG_QUEUE_PAUSE_MTA:
- case IMSG_QUEUE_RESUME_MDA:
- case IMSG_QUEUE_RESUME_MTA:
+ case IMSG_CTL_PAUSE_MDA:
+ case IMSG_CTL_PAUSE_MTA:
+ case IMSG_CTL_RESUME_MDA:
+ case IMSG_CTL_RESUME_MTA:
case IMSG_QUEUE_REMOVE:
- queue_pass_to_scheduler(iev, imsg);
+ m_forward(p_scheduler, imsg);
return;
}
}
- if (iev->proc == PROC_PARENT) {
+ if (p->proc == PROC_PARENT) {
switch (imsg->hdr.type) {
case IMSG_CTL_VERBOSE:
- log_verbose(*(int *)imsg->data);
- queue_pass_to_scheduler(iev, imsg);
+ m_msg(&m, imsg);
+ m_get_int(&m, &v);
+ m_end(&m);
+ log_verbose(v);
+ m_forward(p_scheduler, imsg);
+ return;
+
+ case IMSG_CTL_PROFILE:
+ m_msg(&m, imsg);
+ m_get_int(&m, &v);
+ m_end(&m);
+ profiling = v;
return;
}
}
@@ -302,26 +425,23 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg)
}
static void
-queue_pass_to_scheduler(struct imsgev *iev, struct imsg *imsg)
-{
- imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], imsg->hdr.type,
- iev->proc, imsg->hdr.pid, imsg->fd, imsg->data,
- imsg->hdr.len - sizeof imsg->hdr);
-}
-
-static void
-queue_bounce(struct envelope *e)
+queue_bounce(struct envelope *e, struct delivery_bounce *d)
{
struct envelope b;
- uint32_t msgid;
b = *e;
b.type = D_BOUNCE;
+ b.agent.bounce = *d;
b.retry = 0;
b.lasttry = 0;
b.creation = time(NULL);
b.expire = 3600 * 24 * 7;
+ if (b.id == 0)
+ log_warn("warn: queue_bounce: evpid=0");
+ if (evpid_to_msgid(b.id) == 0)
+ log_warn("warn: queue_bounce: msgid=0, evpid=%016"PRIx64,
+ b.id);
if (e->type == D_BOUNCE) {
log_warnx("warn: queue: double bounce!");
} else if (e->sender.user[0] == '\0') {
@@ -331,11 +451,16 @@ queue_bounce(struct envelope *e)
} else {
log_debug("debug: queue: bouncing evp:%016" PRIx64
" as evp:%016" PRIx64, e->id, b.id);
- imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
- IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, &b, sizeof b);
- msgid = evpid_to_msgid(b.id);
- imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
- IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &msgid, sizeof msgid);
+
+ m_create(p_scheduler, IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1,
+ MSZ_EVP);
+ m_add_envelope(p_scheduler, &b);
+ m_close(p_scheduler);
+
+ m_create(p_scheduler, IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, 5);
+ m_add_msgid(p_scheduler, evpid_to_msgid(b.id));
+ m_close(p_scheduler);
+
stat_increment("queue.bounce", 1);
}
}
@@ -360,6 +485,32 @@ queue_shutdown(void)
_exit(0);
}
+static void
+queue_set_sndbuf(struct mproc *p, int sz)
+{
+ int osz;
+ socklen_t sl;
+
+ sl = sizeof(osz);
+ if (getsockopt(p->imsgbuf.fd, SOL_SOCKET, SO_SNDBUF, &osz, &sl) == -1) {
+ log_warn("warn: getsockopt");
+ return;
+ }
+ if (osz == sz)
+ return;
+
+ if (setsockopt(p->imsgbuf.fd, SOL_SOCKET, SO_SNDBUF, &sz, sl) == -1) {
+ log_warn("warn: setsockopt");
+ return;
+ }
+ if (getsockopt(p->imsgbuf.fd, SOL_SOCKET, SO_SNDBUF, &sz, &sl) == -1) {
+ log_warn("warn: getsockopt");
+ return;
+ }
+ log_debug("debug: queue: adjusted output buffer size for %s: %i -> %i",
+ p->name, osz, sz);
+}
+
pid_t
queue(void)
{
@@ -370,36 +521,29 @@ queue(void)
struct event ev_sigint;
struct event ev_sigterm;
- struct peer peers[] = {
- { PROC_PARENT, imsg_dispatch },
- { PROC_CONTROL, imsg_dispatch },
- { PROC_SMTP, imsg_dispatch },
- { PROC_MDA, imsg_dispatch },
- { PROC_MTA, imsg_dispatch },
- { PROC_LKA, imsg_dispatch },
- { PROC_SCHEDULER, imsg_dispatch }
- };
-
switch (pid = fork()) {
case -1:
fatal("queue: cannot fork");
case 0:
+ env->sc_pid = getpid();
break;
default:
return (pid);
}
purge_config(PURGE_EVERYTHING);
+ if (env->sc_pwqueue) {
+ free(env->sc_pw);
+ env->sc_pw = env->sc_pwqueue;
+ }
pw = env->sc_pw;
-
if (chroot(PATH_SPOOL) == -1)
fatal("queue: chroot");
if (chdir("/") == -1)
fatal("queue: chdir(\"/\")");
- smtpd_process = PROC_QUEUE;
- setproctitle("%s", env->sc_title[smtpd_process]);
+ config_process(PROC_QUEUE);
if (setgroups(1, &pw->pw_gid) ||
setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) ||
@@ -418,8 +562,18 @@ queue(void)
fdlimit(1.0);
- config_pipes(peers, nitems(peers));
- config_peers(peers, nitems(peers));
+ config_peer(PROC_PARENT);
+ config_peer(PROC_CONTROL);
+ config_peer(PROC_SMTP);
+ config_peer(PROC_MDA);
+ config_peer(PROC_MTA);
+ config_peer(PROC_LKA);
+ config_peer(PROC_SCHEDULER);
+ config_done();
+
+ queue_set_sndbuf(p_scheduler, 65536);
+ queue_set_sndbuf(p_mta, 65536);
+ queue_set_sndbuf(p_mda, 65536);
/* setup queue loading task */
evtimer_set(&ev_qload, queue_timeout, &ev_qload);
@@ -445,25 +599,137 @@ queue_timeout(int fd, short event, void *p)
r = queue_envelope_walk(&evp);
if (r == -1) {
- if (msgid)
- imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
- IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &msgid,
- sizeof msgid);
+ if (msgid) {
+ m_create(p_scheduler, IMSG_QUEUE_COMMIT_MESSAGE,
+ 0, 0, -1, 5);
+ m_add_msgid(p_scheduler, msgid);
+ m_close(p_scheduler);
+ }
log_debug("debug: queue: done loading queue into scheduler");
return;
}
if (r) {
- if (msgid && evpid_to_msgid(evp.id) != msgid)
- imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
- IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &msgid,
- sizeof msgid);
+ if (msgid && evpid_to_msgid(evp.id) != msgid) {
+ m_create(p_scheduler, IMSG_QUEUE_COMMIT_MESSAGE,
+ 0, 0, -1, 5);
+ m_add_msgid(p_scheduler, msgid);
+ m_close(p_scheduler);
+ }
msgid = evpid_to_msgid(evp.id);
- imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
- IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, &evp, sizeof evp);
+ m_create(p_scheduler, IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1,
+ MSZ_EVP);
+ m_add_envelope(p_scheduler, &evp);
+ m_close(p_scheduler);
}
tv.tv_sec = 0;
tv.tv_usec = 10;
evtimer_add(ev, &tv);
}
+
+void
+queue_ok(uint64_t evpid)
+{
+ m_create(p_queue, IMSG_DELIVERY_OK, 0, 0, -1, sizeof(evpid) + 1);
+ m_add_evpid(p_queue, evpid);
+ m_close(p_queue);
+}
+
+void
+queue_tempfail(uint64_t evpid, const char *reason)
+{
+ m_create(p_queue, IMSG_DELIVERY_TEMPFAIL, 0, 0, -1,
+ sizeof(evpid) + strlen(reason) + 2);
+ m_add_evpid(p_queue, evpid);
+ m_add_string(p_queue, reason);
+ m_close(p_queue);
+}
+
+void
+queue_permfail(uint64_t evpid, const char *reason)
+{
+ m_create(p_queue, IMSG_DELIVERY_PERMFAIL, 0, 0, -1,
+ sizeof(evpid) + strlen(reason) + 2);
+ m_add_evpid(p_queue, evpid);
+ m_add_string(p_queue, reason);
+ m_close(p_queue);
+}
+
+void
+queue_loop(uint64_t evpid)
+{
+ m_create(p_queue, IMSG_DELIVERY_LOOP, 0, 0, -1, sizeof(evpid) + 1);
+ m_add_evpid(p_queue, evpid);
+ m_close(p_queue);
+}
+
+static void
+queue_log(const struct envelope *e, const char *prefix, const char *status)
+{
+ char rcpt[MAX_LINE_SIZE];
+
+ rcpt[0] = '\0';
+ if (strcmp(e->rcpt.user, e->dest.user) ||
+ strcmp(e->rcpt.domain, e->dest.domain))
+ snprintf(rcpt, sizeof rcpt, "rcpt=<%s@%s>, ",
+ e->rcpt.user, e->rcpt.domain);
+
+ log_info("%s: %s for %016" PRIx64 ": from=<%s@%s>, to=<%s@%s>, "
+ "%sdelay=%s, stat=%s",
+ e->type == D_MDA ? "delivery" : "relay",
+ prefix,
+ e->id, e->sender.user, e->sender.domain,
+ e->dest.user, e->dest.domain,
+ rcpt,
+ duration_to_text(time(NULL) - e->creation),
+ status);
+}
+
+void
+queue_flow_control(void)
+{
+ size_t bufsz;
+ int oldlimit = limit;
+ int set, unset;
+
+ bufsz = p_mda->bytes_queued + p_mta->bytes_queued;
+ if (bufsz <= flow_agent_lowat)
+ limit &= ~LIMIT_AGENT;
+ else if (bufsz > flow_agent_hiwat)
+ limit |= LIMIT_AGENT;
+
+ if (p_scheduler->bytes_queued <= flow_scheduler_lowat)
+ limit &= ~LIMIT_SCHEDULER;
+ else if (p_scheduler->bytes_queued > flow_scheduler_hiwat)
+ limit |= LIMIT_SCHEDULER;
+
+ set = limit & (limit ^ oldlimit);
+ unset = oldlimit & (limit ^ oldlimit);
+
+ if (set & LIMIT_SCHEDULER) {
+ log_warnx("warn: queue: Hiwat reached on scheduler buffer: "
+ "suspending transfer, delivery and lookup input");
+ mproc_disable(p_mta);
+ mproc_disable(p_mda);
+ mproc_disable(p_lka);
+ }
+ else if (unset & LIMIT_SCHEDULER) {
+ log_warnx("warn: queue: Down to lowat on scheduler buffer: "
+ "resuming transfer, delivery and lookup input");
+ mproc_enable(p_mta);
+ mproc_enable(p_mda);
+ mproc_enable(p_lka);
+ }
+
+ if (set & LIMIT_AGENT) {
+ log_warnx("warn: queue: Hiwat reached on transfer and delivery "
+ "buffers: suspending scheduler input");
+ mproc_disable(p_scheduler);
+ }
+ else if (unset & LIMIT_AGENT) {
+ log_warnx("warn: queue: Down to lowat on transfer and delivery "
+ "buffers: resuming scheduler input");
+ mproc_enable(p_scheduler);
+ }
+}