summaryrefslogtreecommitdiff
path: root/usr.sbin/smtpd/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'usr.sbin/smtpd/queue.c')
-rw-r--r--usr.sbin/smtpd/queue.c217
1 files changed, 147 insertions, 70 deletions
diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c
index c8c2d31403c..e51f9532422 100644
--- a/usr.sbin/smtpd/queue.c
+++ b/usr.sbin/smtpd/queue.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: queue.c,v 1.122 2012/08/08 08:50:42 eric Exp $ */
+/* $OpenBSD: queue.c,v 1.123 2012/08/09 09:48:02 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -41,6 +41,7 @@
static void queue_imsg(struct imsgev *, struct imsg *);
static void queue_timeout(int, short, void *);
+static void queue_bounce(struct envelope *);
static void queue_pass_to_scheduler(struct imsgev *, struct imsg *);
static void queue_shutdown(void);
static void queue_sig_handler(int, short, void *);
@@ -48,10 +49,12 @@ static void queue_sig_handler(int, short, void *);
static void
queue_imsg(struct imsgev *iev, struct imsg *imsg)
{
+ static struct mta_batch batch, *mta_batch;
struct submit_status ss;
- struct envelope *e;
- struct mta_batch *mta_batch;
+ struct envelope *e, evp;
int fd, ret;
+ uint64_t id;
+ uint32_t msgid;
log_imsg(PROC_QUEUE, iev->proc, imsg);
@@ -66,8 +69,8 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg)
ret = queue_message_create(&ss.u.msgid);
if (ret == 0)
ss.code = 421;
- imsg_compose_event(iev, IMSG_QUEUE_CREATE_MESSAGE, 0, 0, -1,
- &ss, sizeof ss);
+ imsg_compose_event(iev, IMSG_QUEUE_CREATE_MESSAGE, 0, 0,
+ -1, &ss, sizeof ss);
return;
case IMSG_QUEUE_REMOVE_MESSAGE:
@@ -77,18 +80,18 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg)
case IMSG_QUEUE_COMMIT_MESSAGE:
ss.id = e->session_id;
ss.code = 250;
- if (queue_message_commit(evpid_to_msgid(e->id)))
+ msgid = evpid_to_msgid(e->id);
+ if (queue_message_commit(msgid)) {
stat_increment(e->flags & DF_ENQUEUED ?
STATS_QUEUE_LOCAL : STATS_QUEUE_REMOTE);
- else
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1,
+ &msgid, sizeof msgid);
+ } else
ss.code = 421;
- imsg_compose_event(iev, IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1,
- &ss, sizeof ss);
-
- if (ss.code != 421)
- queue_pass_to_scheduler(iev, imsg);
-
+ imsg_compose_event(iev, IMSG_QUEUE_COMMIT_MESSAGE, 0, 0,
+ -1, &ss, sizeof ss);
return;
case IMSG_QUEUE_MESSAGE_FILE:
@@ -96,28 +99,33 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg)
fd = queue_message_fd_rw(evpid_to_msgid(e->id));
if (fd == -1)
ss.code = 421;
- imsg_compose_event(iev, IMSG_QUEUE_MESSAGE_FILE, 0, 0, fd,
- &ss, sizeof ss);
+ imsg_compose_event(iev, IMSG_QUEUE_MESSAGE_FILE, 0, 0,
+ fd, &ss, sizeof ss);
return;
case IMSG_SMTP_ENQUEUE:
- queue_pass_to_scheduler(iev, imsg);
+ id = *(uint64_t*)(imsg->data);
+ queue_envelope_load(id, &evp);
+ bounce_session(imsg->fd, &evp);
return;
}
}
if (iev->proc == PROC_LKA) {
e = imsg->data;
-
switch (imsg->hdr.type) {
case IMSG_QUEUE_SUBMIT_ENVELOPE:
- ss.id = e->session_id;
- ret = queue_envelope_create(e);
- if (ret == 0) {
+ if (!queue_envelope_create(e)) {
+ ss.id = e->session_id;
ss.code = 421;
imsg_compose_event(env->sc_ievs[PROC_SMTP],
IMSG_QUEUE_TEMPFAIL, 0, 0, -1, &ss,
sizeof ss);
+ } else {
+ /* tell the scheduler */
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, e,
+ sizeof *e);
}
return;
@@ -132,14 +140,64 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg)
}
if (iev->proc == PROC_SCHEDULER) {
- /* forward imsgs from scheduler on its behalf */
- imsg_compose_event(env->sc_ievs[imsg->hdr.peerid], imsg->hdr.type,
- 0, imsg->hdr.pid, imsg->fd, (char *)imsg->data,
- imsg->hdr.len - sizeof imsg->hdr);
- return;
+ switch (imsg->hdr.type) {
+ case IMSG_QUEUE_REMOVE:
+ evp.id = *(uint64_t*)(imsg->data);
+ queue_envelope_delete(&evp);
+ return;
+
+ case IMSG_QUEUE_EXPIRE:
+ id = *(uint64_t*)(imsg->data);
+ queue_envelope_load(id, &evp);
+ envelope_set_errormsg(&evp, "envelope expired");
+ queue_bounce(&evp);
+ queue_envelope_delete(&evp);
+ return;
+
+ case IMSG_MDA_SESS_NEW:
+ id = *(uint64_t*)(imsg->data);
+ queue_envelope_load(id, &evp);
+ evp.lasttry = time(NULL);
+ fd = queue_message_fd_r(evpid_to_msgid(id));
+ imsg_compose_event(env->sc_ievs[PROC_MDA],
+ IMSG_MDA_SESS_NEW, 0, 0, fd, &evp, sizeof evp);
+ return;
+
+ case IMSG_SMTP_ENQUEUE:
+ id = *(uint64_t*)(imsg->data);
+ imsg_compose_event(env->sc_ievs[PROC_SMTP],
+ IMSG_SMTP_ENQUEUE, 0, 0, -1, &id, sizeof id);
+ return;
+
+ case IMSG_BATCH_CREATE:
+ bzero(&batch, sizeof batch);
+ return;
+
+ case IMSG_BATCH_APPEND:
+ id = *(uint64_t*)(imsg->data);
+ queue_envelope_load(id, &evp);
+ if (!batch.id) {
+ batch.id = generate_uid();
+ batch.relay = evp.agent.mta.relay;
+ imsg_compose_event(env->sc_ievs[PROC_MTA],
+ IMSG_BATCH_CREATE, 0, 0, -1,
+ &batch, sizeof batch);
+ }
+ evp.lasttry = time(NULL);
+ evp.batch_id = batch.id;
+ imsg_compose_event(env->sc_ievs[PROC_MTA],
+ IMSG_BATCH_APPEND, 0, 0, -1, &evp, sizeof evp);
+ return;
+
+ case IMSG_BATCH_CLOSE:
+ imsg_compose_event(env->sc_ievs[PROC_MTA],
+ IMSG_BATCH_CLOSE, 0, 0, -1,
+ &batch.id, sizeof batch.id);
+ return;
+ }
}
- if (iev->proc == PROC_MTA) {
+ if (iev->proc == PROC_MTA || iev->proc == PROC_MDA) {
switch (imsg->hdr.type) {
case IMSG_QUEUE_MESSAGE_FD:
mta_batch = imsg->data;
@@ -149,21 +207,29 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg)
return;
case IMSG_QUEUE_DELIVERY_OK:
- case IMSG_QUEUE_DELIVERY_TEMPFAIL:
- case IMSG_QUEUE_DELIVERY_PERMFAIL:
- case IMSG_BATCH_DONE:
- queue_pass_to_scheduler(iev, imsg);
+ e = imsg->data;
+ queue_envelope_delete(e);
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_QUEUE_DELIVERY_OK, 0, 0, -1, &e->id,
+ sizeof e->id);
return;
- }
- }
- if (iev->proc == PROC_MDA) {
- switch (imsg->hdr.type) {
- case IMSG_QUEUE_DELIVERY_OK:
case IMSG_QUEUE_DELIVERY_TEMPFAIL:
+ e = imsg->data;
+ e->retry++;
+ queue_envelope_update(e);
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_QUEUE_DELIVERY_TEMPFAIL, 0, 0, -1, e,
+ sizeof *e);
+ return;
+
case IMSG_QUEUE_DELIVERY_PERMFAIL:
- case IMSG_MDA_SESS_NEW:
- queue_pass_to_scheduler(iev, imsg);
+ e = imsg->data;
+ queue_bounce(e);
+ queue_envelope_delete(e);
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_QUEUE_DELIVERY_PERMFAIL, 0, 0, -1, &e->id,
+ sizeof e->id);
return;
}
}
@@ -174,7 +240,6 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg)
case IMSG_QUEUE_PAUSE_MTA:
case IMSG_QUEUE_RESUME_MDA:
case IMSG_QUEUE_RESUME_MTA:
- case IMSG_QUEUE_SCHEDULE:
case IMSG_QUEUE_REMOVE:
queue_pass_to_scheduler(iev, imsg);
return;
@@ -202,6 +267,36 @@ queue_pass_to_scheduler(struct imsgev *iev, struct imsg *imsg)
}
static void
+queue_bounce(struct envelope *e)
+{
+ struct envelope b;
+ uint32_t msgid;
+
+ b = *e;
+ b.type = D_BOUNCE;
+ b.retry = 0;
+ b.lasttry = 0;
+ b.creation = time(NULL);
+ b.expire = 3600 * 24 * 7;
+
+ if (e->type == D_BOUNCE) {
+ log_warnx("queue: double bounce!");
+ } else if (e->sender.user[0] == '\0') {
+ log_warnx("queue: no return path!");
+ } else if (!queue_envelope_create(&b)) {
+ log_warnx("queue: cannot bounce!");
+ } else {
+ log_debug("queue: bouncing evp:%016" PRIx64
+ " as evp:%016" PRIx64, e->id, b.id);
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, &b, sizeof b);
+ msgid = evpid_to_msgid(b.id);
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &msgid, sizeof msgid);
+ }
+}
+
+static void
queue_sig_handler(int sig, short event, void *p)
{
switch (sig) {
@@ -302,12 +397,12 @@ queue(void)
return (0);
}
-void
+static void
queue_timeout(int fd, short event, void *p)
{
static struct qwalk *q = NULL;
+ static uint32_t last_msgid = 0;
struct event *ev = p;
- static uint64_t last_evpid = 0;
struct envelope envelope;
struct timeval tv;
uint64_t evpid;
@@ -318,47 +413,29 @@ queue_timeout(int fd, short event, void *p)
}
while (qwalk(q, &evpid)) {
- if (! queue_envelope_load(evpid, &envelope))
- continue;
-
- if (evpid_to_msgid(evpid) != evpid_to_msgid(last_evpid) &&
- last_evpid != 0) {
- envelope.id = last_evpid;
+ if (queue_envelope_load(evpid, &envelope))
imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
- IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &envelope,
+ IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, &envelope,
sizeof envelope);
- }
- last_evpid = evpid;
+ if (last_msgid && evpid_to_msgid(evpid) != last_msgid)
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &last_msgid,
+ sizeof last_msgid);
+
+ last_msgid = evpid_to_msgid(evpid);
tv.tv_sec = 0;
tv.tv_usec = 0;
evtimer_add(ev, &tv);
return;
}
- if (last_evpid) {
- envelope.id = last_evpid;
+ if (last_msgid) {
imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
- IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &envelope,
- sizeof envelope);
+ IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &last_msgid,
+ sizeof last_msgid);
}
- log_info("queue: done loading queue into scheduler");
+ log_debug("queue: done loading queue into scheduler");
qwalk_close(q);
}
-
-void
-queue_submit_envelope(struct envelope *ep)
-{
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1,
- ep, sizeof(*ep));
-}
-
-void
-queue_commit_envelopes(struct envelope *ep)
-{
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_QUEUE_COMMIT_ENVELOPES, 0, 0, -1,
- ep, sizeof(*ep));
-}