summaryrefslogtreecommitdiff
path: root/usr.sbin
diff options
context:
space:
mode:
authorEric Faurot <eric@cvs.openbsd.org>2012-08-09 09:48:03 +0000
committerEric Faurot <eric@cvs.openbsd.org>2012-08-09 09:48:03 +0000
commit228501da23f4c47dfd29423d4a0ec553fc140549 (patch)
tree11df7046d7866f0ef41dcf2ea8e17c013d42f687 /usr.sbin
parentdaeaa7f23c7c4ef46a99aded43a1464ff29377f2 (diff)
Improve the message flows to completely isolate operations on the
queue backend within the queue process. The scheduler sends envelope ids to the queue process which loads the envelope and forward the request to the agent responsible for the delivery. The result is sent by the agent to the queue which updates the storage before notifying the scheduler. Bounces are created and enqueued (from the client side) by the queue process, rather than the scheduler. ok gilles@
Diffstat (limited to 'usr.sbin')
-rw-r--r--usr.sbin/smtpd/bounce.c43
-rw-r--r--usr.sbin/smtpd/lka_session.c9
-rw-r--r--usr.sbin/smtpd/mda.c6
-rw-r--r--usr.sbin/smtpd/mta_session.c5
-rw-r--r--usr.sbin/smtpd/queue.c217
-rw-r--r--usr.sbin/smtpd/scheduler.c219
-rw-r--r--usr.sbin/smtpd/smtp.c4
-rw-r--r--usr.sbin/smtpd/smtpd.c5
-rw-r--r--usr.sbin/smtpd/smtpd.h8
9 files changed, 248 insertions, 268 deletions
diff --git a/usr.sbin/smtpd/bounce.c b/usr.sbin/smtpd/bounce.c
index f31b322ef29..c0248cfb3e7 100644
--- a/usr.sbin/smtpd/bounce.c
+++ b/usr.sbin/smtpd/bounce.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: bounce.c,v 1.43 2012/08/08 08:50:42 eric Exp $ */
+/* $OpenBSD: bounce.c,v 1.44 2012/08/09 09:48:02 eric Exp $ */
/*
* Copyright (c) 2009 Gilles Chehade <gilles@openbsd.org>
@@ -60,10 +60,6 @@ struct bounce {
struct io io;
};
-/* XXX remove later */
-void scheduler_envelope_update(struct envelope *);
-void scheduler_envelope_delete(struct envelope *);
-
static void bounce_send(struct bounce *, const char *, ...);
static int bounce_next(struct bounce *);
static void bounce_status(struct bounce *, const char *, ...);
@@ -197,9 +193,10 @@ bounce_next(struct bounce *bounce)
static void
bounce_status(struct bounce *bounce, const char *fmt, ...)
{
- va_list ap;
- char *status;
- int len;
+ va_list ap;
+ char *status;
+ int len, msg;
+ struct envelope *evp;
/* ignore if the envelope has already been updated/deleted */
if (bounce->evp.id == 0)
@@ -210,17 +207,26 @@ bounce_status(struct bounce *bounce, const char *fmt, ...)
fatal("bounce: vasprintf");
va_end(ap);
- if (*status == '2' || *status == '5' || *status == '6') {
- log_debug("#### %s: queue_envelope_delete: %016" PRIx64,
- __func__, bounce->evp.id);
- queue_envelope_delete(&bounce->evp);
- scheduler_envelope_delete(&bounce->evp);
+ if (*status == '2')
+ msg = IMSG_QUEUE_DELIVERY_OK;
+ else if (*status == '5' || *status == '6')
+ msg = IMSG_QUEUE_DELIVERY_PERMFAIL;
+ else
+ msg = IMSG_QUEUE_DELIVERY_TEMPFAIL;
+
+ evp = &bounce->evp;
+ if (msg == IMSG_QUEUE_DELIVERY_TEMPFAIL) {
+ evp->retry++;
+ envelope_set_errormsg(evp, "%s", status);
+ queue_envelope_update(evp);
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], msg, 0, 0, -1,
+ evp, sizeof *evp);
} else {
- bounce->evp.retry++;
- envelope_set_errormsg(&bounce->evp, "%s", status);
- queue_envelope_update(&bounce->evp);
- scheduler_envelope_update(&bounce->evp);
+ queue_envelope_delete(evp);
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], msg, 0, 0, -1,
+ &evp->id, sizeof evp->id);
}
+
bounce->evp.id = 0;
free(status);
}
@@ -234,9 +240,6 @@ bounce_free(struct bounce *bounce)
iobuf_clear(&bounce->iobuf);
io_clear(&bounce->io);
free(bounce);
-
- stat_decrement(STATS_SCHEDULER);
- stat_decrement(STATS_SCHEDULER_BOUNCES);
}
static void
diff --git a/usr.sbin/smtpd/lka_session.c b/usr.sbin/smtpd/lka_session.c
index f0eec8e037e..c85a8572e1d 100644
--- a/usr.sbin/smtpd/lka_session.c
+++ b/usr.sbin/smtpd/lka_session.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: lka_session.c,v 1.19 2012/07/29 17:21:43 gilles Exp $ */
+/* $OpenBSD: lka_session.c,v 1.20 2012/08/09 09:48:02 eric Exp $ */
/*
* Copyright (c) 2011 Gilles Chehade <gilles@openbsd.org>
@@ -310,11 +310,14 @@ lka_session_done(struct lka_session *lks)
/* process the delivery list and submit envelopes to queue */
while ((ep = TAILQ_FIRST(&lks->deliverylist)) != NULL) {
- queue_submit_envelope(ep);
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE],
+ IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, ep, sizeof *ep);
TAILQ_REMOVE(&lks->deliverylist, ep, entry);
free(ep);
}
- queue_commit_envelopes(&lks->ss.envelope);
+ ep = &lks->ss.envelope;
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE],
+ IMSG_QUEUE_COMMIT_ENVELOPES, 0, 0, -1, ep, sizeof *ep);
done:
if (lks->flags & F_ERROR) {
diff --git a/usr.sbin/smtpd/mda.c b/usr.sbin/smtpd/mda.c
index 8ebe76d607a..df5e20fca29 100644
--- a/usr.sbin/smtpd/mda.c
+++ b/usr.sbin/smtpd/mda.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: mda.c,v 1.67 2012/01/13 14:01:57 eric Exp $ */
+/* $OpenBSD: mda.c,v 1.68 2012/08/09 09:48:02 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -236,10 +236,6 @@ mda_imsg(struct imsgev *iev, struct imsg *imsg)
msgbuf_clear(&s->w);
event_del(&s->ev);
free(s);
-
- /* update queue's session count */
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_MDA_SESS_NEW, 0, 0, -1, NULL, 0);
return;
case IMSG_CTL_VERBOSE:
diff --git a/usr.sbin/smtpd/mta_session.c b/usr.sbin/smtpd/mta_session.c
index d99a08f8ee3..6345ae24d87 100644
--- a/usr.sbin/smtpd/mta_session.c
+++ b/usr.sbin/smtpd/mta_session.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: mta_session.c,v 1.6 2012/07/29 20:16:02 eric Exp $ */
+/* $OpenBSD: mta_session.c,v 1.7 2012/08/09 09:48:02 eric Exp $ */
/*
* Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
@@ -439,9 +439,6 @@ mta_enter_state(struct mta_session *s, int newstate)
if (TAILQ_FIRST(&s->tasks))
fatalx("all tasks should have been deleted already");
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_BATCH_DONE, 0, 0, -1, NULL, 0);
-
/* deallocate resources */
SPLAY_REMOVE(mta_session_tree, &mta_sessions, s);
while ((relay = TAILQ_FIRST(&s->relays))) {
diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c
index c8c2d31403c..e51f9532422 100644
--- a/usr.sbin/smtpd/queue.c
+++ b/usr.sbin/smtpd/queue.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: queue.c,v 1.122 2012/08/08 08:50:42 eric Exp $ */
+/* $OpenBSD: queue.c,v 1.123 2012/08/09 09:48:02 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -41,6 +41,7 @@
static void queue_imsg(struct imsgev *, struct imsg *);
static void queue_timeout(int, short, void *);
+static void queue_bounce(struct envelope *);
static void queue_pass_to_scheduler(struct imsgev *, struct imsg *);
static void queue_shutdown(void);
static void queue_sig_handler(int, short, void *);
@@ -48,10 +49,12 @@ static void queue_sig_handler(int, short, void *);
static void
queue_imsg(struct imsgev *iev, struct imsg *imsg)
{
+ static struct mta_batch batch, *mta_batch;
struct submit_status ss;
- struct envelope *e;
- struct mta_batch *mta_batch;
+ struct envelope *e, evp;
int fd, ret;
+ uint64_t id;
+ uint32_t msgid;
log_imsg(PROC_QUEUE, iev->proc, imsg);
@@ -66,8 +69,8 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg)
ret = queue_message_create(&ss.u.msgid);
if (ret == 0)
ss.code = 421;
- imsg_compose_event(iev, IMSG_QUEUE_CREATE_MESSAGE, 0, 0, -1,
- &ss, sizeof ss);
+ imsg_compose_event(iev, IMSG_QUEUE_CREATE_MESSAGE, 0, 0,
+ -1, &ss, sizeof ss);
return;
case IMSG_QUEUE_REMOVE_MESSAGE:
@@ -77,18 +80,18 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg)
case IMSG_QUEUE_COMMIT_MESSAGE:
ss.id = e->session_id;
ss.code = 250;
- if (queue_message_commit(evpid_to_msgid(e->id)))
+ msgid = evpid_to_msgid(e->id);
+ if (queue_message_commit(msgid)) {
stat_increment(e->flags & DF_ENQUEUED ?
STATS_QUEUE_LOCAL : STATS_QUEUE_REMOTE);
- else
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1,
+ &msgid, sizeof msgid);
+ } else
ss.code = 421;
- imsg_compose_event(iev, IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1,
- &ss, sizeof ss);
-
- if (ss.code != 421)
- queue_pass_to_scheduler(iev, imsg);
-
+ imsg_compose_event(iev, IMSG_QUEUE_COMMIT_MESSAGE, 0, 0,
+ -1, &ss, sizeof ss);
return;
case IMSG_QUEUE_MESSAGE_FILE:
@@ -96,28 +99,33 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg)
fd = queue_message_fd_rw(evpid_to_msgid(e->id));
if (fd == -1)
ss.code = 421;
- imsg_compose_event(iev, IMSG_QUEUE_MESSAGE_FILE, 0, 0, fd,
- &ss, sizeof ss);
+ imsg_compose_event(iev, IMSG_QUEUE_MESSAGE_FILE, 0, 0,
+ fd, &ss, sizeof ss);
return;
case IMSG_SMTP_ENQUEUE:
- queue_pass_to_scheduler(iev, imsg);
+ id = *(uint64_t*)(imsg->data);
+ queue_envelope_load(id, &evp);
+ bounce_session(imsg->fd, &evp);
return;
}
}
if (iev->proc == PROC_LKA) {
e = imsg->data;
-
switch (imsg->hdr.type) {
case IMSG_QUEUE_SUBMIT_ENVELOPE:
- ss.id = e->session_id;
- ret = queue_envelope_create(e);
- if (ret == 0) {
+ if (!queue_envelope_create(e)) {
+ ss.id = e->session_id;
ss.code = 421;
imsg_compose_event(env->sc_ievs[PROC_SMTP],
IMSG_QUEUE_TEMPFAIL, 0, 0, -1, &ss,
sizeof ss);
+ } else {
+ /* tell the scheduler */
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, e,
+ sizeof *e);
}
return;
@@ -132,14 +140,64 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg)
}
if (iev->proc == PROC_SCHEDULER) {
- /* forward imsgs from scheduler on its behalf */
- imsg_compose_event(env->sc_ievs[imsg->hdr.peerid], imsg->hdr.type,
- 0, imsg->hdr.pid, imsg->fd, (char *)imsg->data,
- imsg->hdr.len - sizeof imsg->hdr);
- return;
+ switch (imsg->hdr.type) {
+ case IMSG_QUEUE_REMOVE:
+ evp.id = *(uint64_t*)(imsg->data);
+ queue_envelope_delete(&evp);
+ return;
+
+ case IMSG_QUEUE_EXPIRE:
+ id = *(uint64_t*)(imsg->data);
+ queue_envelope_load(id, &evp);
+ envelope_set_errormsg(&evp, "envelope expired");
+ queue_bounce(&evp);
+ queue_envelope_delete(&evp);
+ return;
+
+ case IMSG_MDA_SESS_NEW:
+ id = *(uint64_t*)(imsg->data);
+ queue_envelope_load(id, &evp);
+ evp.lasttry = time(NULL);
+ fd = queue_message_fd_r(evpid_to_msgid(id));
+ imsg_compose_event(env->sc_ievs[PROC_MDA],
+ IMSG_MDA_SESS_NEW, 0, 0, fd, &evp, sizeof evp);
+ return;
+
+ case IMSG_SMTP_ENQUEUE:
+ id = *(uint64_t*)(imsg->data);
+ imsg_compose_event(env->sc_ievs[PROC_SMTP],
+ IMSG_SMTP_ENQUEUE, 0, 0, -1, &id, sizeof id);
+ return;
+
+ case IMSG_BATCH_CREATE:
+ bzero(&batch, sizeof batch);
+ return;
+
+ case IMSG_BATCH_APPEND:
+ id = *(uint64_t*)(imsg->data);
+ queue_envelope_load(id, &evp);
+ if (!batch.id) {
+ batch.id = generate_uid();
+ batch.relay = evp.agent.mta.relay;
+ imsg_compose_event(env->sc_ievs[PROC_MTA],
+ IMSG_BATCH_CREATE, 0, 0, -1,
+ &batch, sizeof batch);
+ }
+ evp.lasttry = time(NULL);
+ evp.batch_id = batch.id;
+ imsg_compose_event(env->sc_ievs[PROC_MTA],
+ IMSG_BATCH_APPEND, 0, 0, -1, &evp, sizeof evp);
+ return;
+
+ case IMSG_BATCH_CLOSE:
+ imsg_compose_event(env->sc_ievs[PROC_MTA],
+ IMSG_BATCH_CLOSE, 0, 0, -1,
+ &batch.id, sizeof batch.id);
+ return;
+ }
}
- if (iev->proc == PROC_MTA) {
+ if (iev->proc == PROC_MTA || iev->proc == PROC_MDA) {
switch (imsg->hdr.type) {
case IMSG_QUEUE_MESSAGE_FD:
mta_batch = imsg->data;
@@ -149,21 +207,29 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg)
return;
case IMSG_QUEUE_DELIVERY_OK:
- case IMSG_QUEUE_DELIVERY_TEMPFAIL:
- case IMSG_QUEUE_DELIVERY_PERMFAIL:
- case IMSG_BATCH_DONE:
- queue_pass_to_scheduler(iev, imsg);
+ e = imsg->data;
+ queue_envelope_delete(e);
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_QUEUE_DELIVERY_OK, 0, 0, -1, &e->id,
+ sizeof e->id);
return;
- }
- }
- if (iev->proc == PROC_MDA) {
- switch (imsg->hdr.type) {
- case IMSG_QUEUE_DELIVERY_OK:
case IMSG_QUEUE_DELIVERY_TEMPFAIL:
+ e = imsg->data;
+ e->retry++;
+ queue_envelope_update(e);
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_QUEUE_DELIVERY_TEMPFAIL, 0, 0, -1, e,
+ sizeof *e);
+ return;
+
case IMSG_QUEUE_DELIVERY_PERMFAIL:
- case IMSG_MDA_SESS_NEW:
- queue_pass_to_scheduler(iev, imsg);
+ e = imsg->data;
+ queue_bounce(e);
+ queue_envelope_delete(e);
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_QUEUE_DELIVERY_PERMFAIL, 0, 0, -1, &e->id,
+ sizeof e->id);
return;
}
}
@@ -174,7 +240,6 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg)
case IMSG_QUEUE_PAUSE_MTA:
case IMSG_QUEUE_RESUME_MDA:
case IMSG_QUEUE_RESUME_MTA:
- case IMSG_QUEUE_SCHEDULE:
case IMSG_QUEUE_REMOVE:
queue_pass_to_scheduler(iev, imsg);
return;
@@ -202,6 +267,36 @@ queue_pass_to_scheduler(struct imsgev *iev, struct imsg *imsg)
}
static void
+queue_bounce(struct envelope *e)
+{
+ struct envelope b;
+ uint32_t msgid;
+
+ b = *e;
+ b.type = D_BOUNCE;
+ b.retry = 0;
+ b.lasttry = 0;
+ b.creation = time(NULL);
+ b.expire = 3600 * 24 * 7;
+
+ if (e->type == D_BOUNCE) {
+ log_warnx("queue: double bounce!");
+ } else if (e->sender.user[0] == '\0') {
+ log_warnx("queue: no return path!");
+ } else if (!queue_envelope_create(&b)) {
+ log_warnx("queue: cannot bounce!");
+ } else {
+ log_debug("queue: bouncing evp:%016" PRIx64
+ " as evp:%016" PRIx64, e->id, b.id);
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, &b, sizeof b);
+ msgid = evpid_to_msgid(b.id);
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &msgid, sizeof msgid);
+ }
+}
+
+static void
queue_sig_handler(int sig, short event, void *p)
{
switch (sig) {
@@ -302,12 +397,12 @@ queue(void)
return (0);
}
-void
+static void
queue_timeout(int fd, short event, void *p)
{
static struct qwalk *q = NULL;
+ static uint32_t last_msgid = 0;
struct event *ev = p;
- static uint64_t last_evpid = 0;
struct envelope envelope;
struct timeval tv;
uint64_t evpid;
@@ -318,47 +413,29 @@ queue_timeout(int fd, short event, void *p)
}
while (qwalk(q, &evpid)) {
- if (! queue_envelope_load(evpid, &envelope))
- continue;
-
- if (evpid_to_msgid(evpid) != evpid_to_msgid(last_evpid) &&
- last_evpid != 0) {
- envelope.id = last_evpid;
+ if (queue_envelope_load(evpid, &envelope))
imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
- IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &envelope,
+ IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, &envelope,
sizeof envelope);
- }
- last_evpid = evpid;
+ if (last_msgid && evpid_to_msgid(evpid) != last_msgid)
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &last_msgid,
+ sizeof last_msgid);
+
+ last_msgid = evpid_to_msgid(evpid);
tv.tv_sec = 0;
tv.tv_usec = 0;
evtimer_add(ev, &tv);
return;
}
- if (last_evpid) {
- envelope.id = last_evpid;
+ if (last_msgid) {
imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
- IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &envelope,
- sizeof envelope);
+ IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &last_msgid,
+ sizeof last_msgid);
}
- log_info("queue: done loading queue into scheduler");
+ log_debug("queue: done loading queue into scheduler");
qwalk_close(q);
}
-
-void
-queue_submit_envelope(struct envelope *ep)
-{
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1,
- ep, sizeof(*ep));
-}
-
-void
-queue_commit_envelopes(struct envelope *ep)
-{
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_QUEUE_COMMIT_ENVELOPES, 0, 0, -1,
- ep, sizeof(*ep));
-}
diff --git a/usr.sbin/smtpd/scheduler.c b/usr.sbin/smtpd/scheduler.c
index 9efaf8029a4..8dbf95d6880 100644
--- a/usr.sbin/smtpd/scheduler.c
+++ b/usr.sbin/smtpd/scheduler.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: scheduler.c,v 1.8 2012/08/08 08:50:42 eric Exp $ */
+/* $OpenBSD: scheduler.c,v 1.9 2012/08/09 09:48:02 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -57,9 +57,6 @@ static void scheduler_process_mda(struct scheduler_batch *);
static void scheduler_process_mta(struct scheduler_batch *);
static int scheduler_load_message(u_int32_t);
-void scheduler_envelope_update(struct envelope *);
-void scheduler_envelope_delete(struct envelope *);
-
static struct scheduler_backend *backend = NULL;
extern const char *backend_scheduler;
@@ -67,127 +64,104 @@ extern const char *backend_scheduler;
void
scheduler_imsg(struct imsgev *iev, struct imsg *imsg)
{
- struct envelope *e, bounce;
- struct scheduler_info si;
+ struct envelope *e;
+ struct scheduler_info si;
+ uint64_t id;
+ uint32_t msgid;
log_imsg(PROC_SCHEDULER, iev->proc, imsg);
switch (imsg->hdr.type) {
- case IMSG_QUEUE_COMMIT_MESSAGE:
+
+ case IMSG_QUEUE_SUBMIT_ENVELOPE:
e = imsg->data;
log_trace(TRACE_SCHEDULER,
- "scheduler: IMSG_QUEUE_COMMIT_MESSAGE: %016"PRIx64, e->id);
- scheduler_load_message(evpid_to_msgid(e->id));
+ "scheduler: inserting evp:%016" PRIx64, e->id);
+ scheduler_info(&si, e);
+ backend->insert(&si);
+ return;
+
+ case IMSG_QUEUE_COMMIT_MESSAGE:
+ msgid = *(uint32_t *)(imsg->data);
+ log_trace(TRACE_SCHEDULER,
+ "scheduler: commiting msg:%08" PRIx32, msgid);
+ backend->commit(msgid);
+ scheduler_reset_events();
+ return;
+
+ case IMSG_QUEUE_TEMPFAIL:
+ msgid = *(uint32_t *)(imsg->data);
+ log_trace(TRACE_SCHEDULER, "scheduler: aborting msg:%08" PRIx32,
+ msgid);
+ backend->rollback(msgid);
scheduler_reset_events();
return;
case IMSG_QUEUE_DELIVERY_OK:
- stat_decrement(STATS_SCHEDULER);
- e = imsg->data;
+ id = *(uint64_t *)(imsg->data);
log_trace(TRACE_SCHEDULER,
- "scheduler: IMSG_QUEUE_DELIVERY_OK: %016"PRIx64, e->id);
- backend->delete(e->id);
- queue_envelope_delete(e);
+ "scheduler: deleting evp:%016" PRIx64 " (ok)", id);
+ backend->delete(id);
scheduler_reset_events();
return;
case IMSG_QUEUE_DELIVERY_TEMPFAIL:
- stat_decrement(STATS_SCHEDULER);
e = imsg->data;
log_trace(TRACE_SCHEDULER,
- "scheduler: IMSG_QUEUE_DELIVERY_TEMPFAIL: %016"PRIx64, e->id);
- e->retry++;
- queue_envelope_update(e);
+ "scheduler: updating evp:%016" PRIx64, e->id);
scheduler_info(&si, e);
backend->update(&si);
scheduler_reset_events();
return;
case IMSG_QUEUE_DELIVERY_PERMFAIL:
- stat_decrement(STATS_SCHEDULER);
- e = imsg->data;
+ id = *(uint64_t *)(imsg->data);
log_trace(TRACE_SCHEDULER,
- "scheduler: IMSG_QUEUE_DELIVERY_PERMFAIL: %016"PRIx64, e->id);
- if (e->type != D_BOUNCE && e->sender.user[0] != '\0') {
- bounce_record_message(e, &bounce);
- scheduler_info(&si, &bounce);
- backend->insert(&si);
- backend->commit(evpid_to_msgid(bounce.id));
- }
- backend->delete(e->id);
- queue_envelope_delete(e);
- scheduler_reset_events();
- return;
-
- case IMSG_MDA_SESS_NEW:
- log_trace(TRACE_SCHEDULER, "scheduler: IMSG_MDA_SESS_NEW");
- stat_decrement(STATS_MDA_SESSION);
- if (env->sc_maxconn - stat_get(STATS_MDA_SESSION, STAT_ACTIVE))
- env->sc_flags &= ~SMTPD_MDA_BUSY;
+ "scheduler: deleting evp:%016" PRIx64 " (fail)", id);
+ backend->delete(id);
scheduler_reset_events();
return;
- case IMSG_BATCH_DONE:
- log_trace(TRACE_SCHEDULER, "scheduler: IMSG_BATCH_DONE");
- stat_decrement(STATS_MTA_SESSION);
- if (env->sc_maxconn - stat_get(STATS_MTA_SESSION, STAT_ACTIVE))
- env->sc_flags &= ~SMTPD_MTA_BUSY;
- scheduler_reset_events();
- return;
-
- case IMSG_SMTP_ENQUEUE:
- e = imsg->data;
- log_trace(TRACE_SCHEDULER,
- "scheduler: IMSG_SMTP_ENQUEUE: %016"PRIx64, e->id);
- if (imsg->fd < 0 || !bounce_session(imsg->fd, e)) {
- queue_envelope_update(e);
- scheduler_info(&si, e);
- backend->update(&si);
- scheduler_reset_events();
- return;
- }
- return;
-
case IMSG_QUEUE_PAUSE_MDA:
- log_trace(TRACE_SCHEDULER, "scheduler: IMSG_QUEUE_PAUSE_MDA");
+ log_trace(TRACE_SCHEDULER, "scheduler: pausing mda");
env->sc_flags |= SMTPD_MDA_PAUSED;
return;
case IMSG_QUEUE_RESUME_MDA:
- log_trace(TRACE_SCHEDULER, "scheduler: IMSG_QUEUE_RESUME_MDA");
+ log_trace(TRACE_SCHEDULER, "scheduler: resuming mda");
env->sc_flags &= ~SMTPD_MDA_PAUSED;
scheduler_reset_events();
return;
case IMSG_QUEUE_PAUSE_MTA:
- log_trace(TRACE_SCHEDULER, "scheduler: IMSG_QUEUE_PAUSE_MTA");
+ log_trace(TRACE_SCHEDULER, "scheduler: pausing mta");
env->sc_flags |= SMTPD_MTA_PAUSED;
return;
case IMSG_QUEUE_RESUME_MTA:
- log_trace(TRACE_SCHEDULER, "scheduler: IMSG_QUEUE_RESUME_MTA");
+ log_trace(TRACE_SCHEDULER, "scheduler: resuming mta");
env->sc_flags &= ~SMTPD_MTA_PAUSED;
scheduler_reset_events();
return;
case IMSG_CTL_VERBOSE:
- log_trace(TRACE_SCHEDULER, "scheduler: IMSG_CTL_VERBOSE");
log_verbose(*(int *)imsg->data);
return;
case IMSG_SCHEDULER_SCHEDULE:
- log_trace(TRACE_SCHEDULER,
- "scheduler: IMSG_SCHEDULER_SCHEDULE: %016"PRIx64,
- *(u_int64_t *)imsg->data);
- backend->schedule(*(u_int64_t *)imsg->data);
+ id = *(uint64_t *)(imsg->data);
+ log_debug("scheduler: scheduling evp:%016" PRIx64, id);
+ backend->schedule(id);
scheduler_reset_events();
return;
case IMSG_SCHEDULER_REMOVE:
- log_trace(TRACE_SCHEDULER,
- "scheduler: IMSG_SCHEDULER_REMOVE: %016"PRIx64,
- *(u_int64_t *)imsg->data);
- backend->remove(*(u_int64_t *)imsg->data);
+ id = *(uint64_t *)(imsg->data);
+ if (id <= 0xffffffffL)
+ log_debug("scheduler: removing msg:%08" PRIx64, id);
+ else
+ log_debug("scheduler: removing evp:%016" PRIx64, id);
+ backend->remove(id);
scheduler_reset_events();
return;
}
@@ -358,15 +332,15 @@ scheduler_timeout(int fd, short event, void *p)
static void
scheduler_process_remove(struct scheduler_batch *batch)
{
- struct envelope evp;
struct id_list *e;
while ((e = batch->evpids)) {
batch->evpids = e->next;
- log_debug("scheduler: deleting evp:%016" PRIx64 " (removed)",
+ log_debug("scheduler: evp:%016" PRIx64 " removed",
e->id);
- evp.id = e->id;
- queue_envelope_delete(&evp);
+ backend->delete(e->id);
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_QUEUE_REMOVE,
+ 0, 0, -1, &e->id, sizeof e->id);
free(e);
}
}
@@ -374,15 +348,15 @@ scheduler_process_remove(struct scheduler_batch *batch)
static void
scheduler_process_expire(struct scheduler_batch *batch)
{
- struct envelope evp;
struct id_list *e;
while ((e = batch->evpids)) {
batch->evpids = e->next;
- log_debug("scheduler: deleting evp:%016" PRIx64 " (expire)",
+ log_debug("scheduler: evp:%016" PRIx64 " expired",
e->id);
- evp.id = e->id;
- queue_envelope_delete(&evp);
+ backend->delete(e->id);
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_QUEUE_EXPIRE,
+ 0, 0, -1, &e->id, sizeof e->id);
free(e);
}
}
@@ -390,20 +364,14 @@ scheduler_process_expire(struct scheduler_batch *batch)
static void
scheduler_process_bounce(struct scheduler_batch *batch)
{
- struct envelope evp;
struct id_list *e;
while ((e = batch->evpids)) {
batch->evpids = e->next;
- log_debug("scheduler: scheduling evp:%016" PRIx64 " (bounce)",
+ log_debug("scheduler: evp:%016" PRIx64 " scheduled (bounce)",
e->id);
- queue_envelope_load(e->id, &evp);
- evp.lasttry = time(NULL);
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_SMTP_ENQUEUE, PROC_SMTP, 0, -1, &evp,
- sizeof evp);
- stat_increment(STATS_SCHEDULER);
- stat_increment(STATS_SCHEDULER_BOUNCES);
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_SMTP_ENQUEUE,
+ 0, 0, -1, &e->id, sizeof e->id);
free(e);
}
}
@@ -411,22 +379,14 @@ scheduler_process_bounce(struct scheduler_batch *batch)
static void
scheduler_process_mda(struct scheduler_batch *batch)
{
- struct envelope evp;
struct id_list *e;
- int fd;
while ((e = batch->evpids)) {
batch->evpids = e->next;
- log_debug("scheduler: scheduling evp:%016" PRIx64 " (mda)",
+ log_debug("scheduler: evp:%016" PRIx64 " scheduled (mda)",
e->id);
- queue_envelope_load(e->id, &evp);
- evp.lasttry = time(NULL);
- fd = queue_message_fd_r(evpid_to_msgid(evp.id));
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_MDA_SESS_NEW, PROC_MDA, 0, fd, &evp,
- sizeof evp);
- stat_increment(STATS_SCHEDULER);
- stat_increment(STATS_MDA_SESSION);
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_MDA_SESS_NEW,
+ 0, 0, -1, &e->id, sizeof e->id);
free(e);
}
}
@@ -434,73 +394,22 @@ scheduler_process_mda(struct scheduler_batch *batch)
static void
scheduler_process_mta(struct scheduler_batch *batch)
{
- struct envelope evp;
- struct mta_batch mta_batch;
struct id_list *e;
- queue_envelope_load(batch->evpids->id, &evp);
-
- bzero(&mta_batch, sizeof mta_batch);
- mta_batch.id = arc4random();
- mta_batch.relay = evp.agent.mta.relay;
-
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_BATCH_CREATE, PROC_MTA, 0, -1, &mta_batch,
- sizeof mta_batch);
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_BATCH_CREATE,
+ 0, 0, -1, NULL, 0);
while ((e = batch->evpids)) {
batch->evpids = e->next;
- log_debug("scheduler: scheduling evp:%016" PRIx64 " (mta)",
+ log_debug("scheduler: evp:%016" PRIx64 " scheduled (mta)",
e->id);
- queue_envelope_load(e->id, &evp);
- evp.lasttry = time(NULL);
- evp.batch_id = mta_batch.id;
imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_BATCH_APPEND,
- PROC_MTA, 0, -1, &evp, sizeof evp);
+ 0, 0, -1, &e->id, sizeof e->id);
free(e);
- stat_increment(STATS_SCHEDULER);
}
imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_BATCH_CLOSE,
- PROC_MTA, 0, -1, &mta_batch, sizeof mta_batch);
+ 0, 0, -1, NULL, 0);
stat_increment(STATS_MTA_SESSION);
}
-
-void
-scheduler_envelope_update(struct envelope *e)
-{
- struct scheduler_info si;
-
- scheduler_info(&si, e);
- backend->update(&si);
- scheduler_reset_events();
-}
-
-void
-scheduler_envelope_delete(struct envelope *e)
-{
- backend->delete(e->id);
- scheduler_reset_events();
-}
-
-static int
-scheduler_load_message(u_int32_t msgid)
-{
- struct qwalk *q;
- u_int64_t evpid;
- struct envelope envelope;
- struct scheduler_info si;
-
- q = qwalk_new(msgid);
- while (qwalk(q, &evpid)) {
- if (! queue_envelope_load(evpid, &envelope))
- continue;
- scheduler_info(&si, &envelope);
- backend->insert(&si);
- }
- qwalk_close(q);
- backend->commit(msgid);
-
- return 1;
-}
diff --git a/usr.sbin/smtpd/smtp.c b/usr.sbin/smtpd/smtp.c
index fc0871c20cb..a9b217df20e 100644
--- a/usr.sbin/smtpd/smtp.c
+++ b/usr.sbin/smtpd/smtp.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: smtp.c,v 1.102 2012/06/01 14:55:09 eric Exp $ */
+/* $OpenBSD: smtp.c,v 1.103 2012/08/09 09:48:02 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -154,7 +154,7 @@ smtp_imsg(struct imsgev *iev, struct imsg *imsg)
case IMSG_SMTP_ENQUEUE:
imsg_compose_event(iev, IMSG_SMTP_ENQUEUE, 0, 0,
smtp_enqueue(NULL), imsg->data,
- sizeof(struct envelope));
+ imsg->hdr.len - sizeof imsg->hdr);
return;
}
}
diff --git a/usr.sbin/smtpd/smtpd.c b/usr.sbin/smtpd/smtpd.c
index 7fbbe7045fe..28cf891ed98 100644
--- a/usr.sbin/smtpd/smtpd.c
+++ b/usr.sbin/smtpd/smtpd.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: smtpd.c,v 1.156 2012/08/08 17:28:36 eric Exp $ */
+/* $OpenBSD: smtpd.c,v 1.157 2012/08/09 09:48:02 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -1248,8 +1248,8 @@ imsg_to_str(int type)
CASE(IMSG_QUEUE_MESSAGE_FD);
CASE(IMSG_QUEUE_MESSAGE_FILE);
- CASE(IMSG_QUEUE_SCHEDULE);
CASE(IMSG_QUEUE_REMOVE);
+ CASE(IMSG_QUEUE_EXPIRE);
CASE(IMSG_SCHEDULER_REMOVE);
CASE(IMSG_SCHEDULER_SCHEDULE);
@@ -1257,7 +1257,6 @@ imsg_to_str(int type)
CASE(IMSG_BATCH_CREATE);
CASE(IMSG_BATCH_APPEND);
CASE(IMSG_BATCH_CLOSE);
- CASE(IMSG_BATCH_DONE);
CASE(IMSG_PARENT_FORWARD_OPEN);
CASE(IMSG_PARENT_FORK_MDA);
diff --git a/usr.sbin/smtpd/smtpd.h b/usr.sbin/smtpd/smtpd.h
index 62f6ae20cfe..688a9a59b4a 100644
--- a/usr.sbin/smtpd/smtpd.h
+++ b/usr.sbin/smtpd/smtpd.h
@@ -1,4 +1,4 @@
-/* $OpenBSD: smtpd.h,v 1.316 2012/08/08 17:33:55 eric Exp $ */
+/* $OpenBSD: smtpd.h,v 1.317 2012/08/09 09:48:02 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -156,8 +156,8 @@ enum imsg_type {
IMSG_QUEUE_DELIVERY_PERMFAIL,
IMSG_QUEUE_MESSAGE_FD,
IMSG_QUEUE_MESSAGE_FILE,
- IMSG_QUEUE_SCHEDULE,
IMSG_QUEUE_REMOVE,
+ IMSG_QUEUE_EXPIRE,
IMSG_SCHEDULER_REMOVE,
IMSG_SCHEDULER_SCHEDULE,
@@ -165,7 +165,6 @@ enum imsg_type {
IMSG_BATCH_CREATE,
IMSG_BATCH_APPEND,
IMSG_BATCH_CLOSE,
- IMSG_BATCH_DONE,
IMSG_PARENT_FORWARD_OPEN,
IMSG_PARENT_FORK_MDA,
@@ -324,7 +323,6 @@ struct mailaddr {
};
enum delivery_type {
- D_INVALID = 0,
D_MDA,
D_MTA,
D_BOUNCE
@@ -1077,8 +1075,6 @@ int cmdline_symset(char *);
/* queue.c */
pid_t queue(void);
-void queue_submit_envelope(struct envelope *);
-void queue_commit_envelopes(struct envelope *);
/* queue_backend.c */