summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--usr.sbin/smtpd/bounce.c43
-rw-r--r--usr.sbin/smtpd/lka_session.c9
-rw-r--r--usr.sbin/smtpd/mda.c6
-rw-r--r--usr.sbin/smtpd/mta_session.c5
-rw-r--r--usr.sbin/smtpd/queue.c217
-rw-r--r--usr.sbin/smtpd/scheduler.c219
-rw-r--r--usr.sbin/smtpd/smtp.c4
-rw-r--r--usr.sbin/smtpd/smtpd.c5
-rw-r--r--usr.sbin/smtpd/smtpd.h8
9 files changed, 248 insertions, 268 deletions
diff --git a/usr.sbin/smtpd/bounce.c b/usr.sbin/smtpd/bounce.c
index f31b322ef29..c0248cfb3e7 100644
--- a/usr.sbin/smtpd/bounce.c
+++ b/usr.sbin/smtpd/bounce.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: bounce.c,v 1.43 2012/08/08 08:50:42 eric Exp $ */
+/* $OpenBSD: bounce.c,v 1.44 2012/08/09 09:48:02 eric Exp $ */
/*
* Copyright (c) 2009 Gilles Chehade <gilles@openbsd.org>
@@ -60,10 +60,6 @@ struct bounce {
struct io io;
};
-/* XXX remove later */
-void scheduler_envelope_update(struct envelope *);
-void scheduler_envelope_delete(struct envelope *);
-
static void bounce_send(struct bounce *, const char *, ...);
static int bounce_next(struct bounce *);
static void bounce_status(struct bounce *, const char *, ...);
@@ -197,9 +193,10 @@ bounce_next(struct bounce *bounce)
static void
bounce_status(struct bounce *bounce, const char *fmt, ...)
{
- va_list ap;
- char *status;
- int len;
+ va_list ap;
+ char *status;
+ int len, msg;
+ struct envelope *evp;
/* ignore if the envelope has already been updated/deleted */
if (bounce->evp.id == 0)
@@ -210,17 +207,26 @@ bounce_status(struct bounce *bounce, const char *fmt, ...)
fatal("bounce: vasprintf");
va_end(ap);
- if (*status == '2' || *status == '5' || *status == '6') {
- log_debug("#### %s: queue_envelope_delete: %016" PRIx64,
- __func__, bounce->evp.id);
- queue_envelope_delete(&bounce->evp);
- scheduler_envelope_delete(&bounce->evp);
+ if (*status == '2')
+ msg = IMSG_QUEUE_DELIVERY_OK;
+ else if (*status == '5' || *status == '6')
+ msg = IMSG_QUEUE_DELIVERY_PERMFAIL;
+ else
+ msg = IMSG_QUEUE_DELIVERY_TEMPFAIL;
+
+ evp = &bounce->evp;
+ if (msg == IMSG_QUEUE_DELIVERY_TEMPFAIL) {
+ evp->retry++;
+ envelope_set_errormsg(evp, "%s", status);
+ queue_envelope_update(evp);
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], msg, 0, 0, -1,
+ evp, sizeof *evp);
} else {
- bounce->evp.retry++;
- envelope_set_errormsg(&bounce->evp, "%s", status);
- queue_envelope_update(&bounce->evp);
- scheduler_envelope_update(&bounce->evp);
+ queue_envelope_delete(evp);
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], msg, 0, 0, -1,
+ &evp->id, sizeof evp->id);
}
+
bounce->evp.id = 0;
free(status);
}
@@ -234,9 +240,6 @@ bounce_free(struct bounce *bounce)
iobuf_clear(&bounce->iobuf);
io_clear(&bounce->io);
free(bounce);
-
- stat_decrement(STATS_SCHEDULER);
- stat_decrement(STATS_SCHEDULER_BOUNCES);
}
static void
diff --git a/usr.sbin/smtpd/lka_session.c b/usr.sbin/smtpd/lka_session.c
index f0eec8e037e..c85a8572e1d 100644
--- a/usr.sbin/smtpd/lka_session.c
+++ b/usr.sbin/smtpd/lka_session.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: lka_session.c,v 1.19 2012/07/29 17:21:43 gilles Exp $ */
+/* $OpenBSD: lka_session.c,v 1.20 2012/08/09 09:48:02 eric Exp $ */
/*
* Copyright (c) 2011 Gilles Chehade <gilles@openbsd.org>
@@ -310,11 +310,14 @@ lka_session_done(struct lka_session *lks)
/* process the delivery list and submit envelopes to queue */
while ((ep = TAILQ_FIRST(&lks->deliverylist)) != NULL) {
- queue_submit_envelope(ep);
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE],
+ IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, ep, sizeof *ep);
TAILQ_REMOVE(&lks->deliverylist, ep, entry);
free(ep);
}
- queue_commit_envelopes(&lks->ss.envelope);
+ ep = &lks->ss.envelope;
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE],
+ IMSG_QUEUE_COMMIT_ENVELOPES, 0, 0, -1, ep, sizeof *ep);
done:
if (lks->flags & F_ERROR) {
diff --git a/usr.sbin/smtpd/mda.c b/usr.sbin/smtpd/mda.c
index 8ebe76d607a..df5e20fca29 100644
--- a/usr.sbin/smtpd/mda.c
+++ b/usr.sbin/smtpd/mda.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: mda.c,v 1.67 2012/01/13 14:01:57 eric Exp $ */
+/* $OpenBSD: mda.c,v 1.68 2012/08/09 09:48:02 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -236,10 +236,6 @@ mda_imsg(struct imsgev *iev, struct imsg *imsg)
msgbuf_clear(&s->w);
event_del(&s->ev);
free(s);
-
- /* update queue's session count */
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_MDA_SESS_NEW, 0, 0, -1, NULL, 0);
return;
case IMSG_CTL_VERBOSE:
diff --git a/usr.sbin/smtpd/mta_session.c b/usr.sbin/smtpd/mta_session.c
index d99a08f8ee3..6345ae24d87 100644
--- a/usr.sbin/smtpd/mta_session.c
+++ b/usr.sbin/smtpd/mta_session.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: mta_session.c,v 1.6 2012/07/29 20:16:02 eric Exp $ */
+/* $OpenBSD: mta_session.c,v 1.7 2012/08/09 09:48:02 eric Exp $ */
/*
* Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
@@ -439,9 +439,6 @@ mta_enter_state(struct mta_session *s, int newstate)
if (TAILQ_FIRST(&s->tasks))
fatalx("all tasks should have been deleted already");
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_BATCH_DONE, 0, 0, -1, NULL, 0);
-
/* deallocate resources */
SPLAY_REMOVE(mta_session_tree, &mta_sessions, s);
while ((relay = TAILQ_FIRST(&s->relays))) {
diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c
index c8c2d31403c..e51f9532422 100644
--- a/usr.sbin/smtpd/queue.c
+++ b/usr.sbin/smtpd/queue.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: queue.c,v 1.122 2012/08/08 08:50:42 eric Exp $ */
+/* $OpenBSD: queue.c,v 1.123 2012/08/09 09:48:02 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -41,6 +41,7 @@
static void queue_imsg(struct imsgev *, struct imsg *);
static void queue_timeout(int, short, void *);
+static void queue_bounce(struct envelope *);
static void queue_pass_to_scheduler(struct imsgev *, struct imsg *);
static void queue_shutdown(void);
static void queue_sig_handler(int, short, void *);
@@ -48,10 +49,12 @@ static void queue_sig_handler(int, short, void *);
static void
queue_imsg(struct imsgev *iev, struct imsg *imsg)
{
+ static struct mta_batch batch, *mta_batch;
struct submit_status ss;
- struct envelope *e;
- struct mta_batch *mta_batch;
+ struct envelope *e, evp;
int fd, ret;
+ uint64_t id;
+ uint32_t msgid;
log_imsg(PROC_QUEUE, iev->proc, imsg);
@@ -66,8 +69,8 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg)
ret = queue_message_create(&ss.u.msgid);
if (ret == 0)
ss.code = 421;
- imsg_compose_event(iev, IMSG_QUEUE_CREATE_MESSAGE, 0, 0, -1,
- &ss, sizeof ss);
+ imsg_compose_event(iev, IMSG_QUEUE_CREATE_MESSAGE, 0, 0,
+ -1, &ss, sizeof ss);
return;
case IMSG_QUEUE_REMOVE_MESSAGE:
@@ -77,18 +80,18 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg)
case IMSG_QUEUE_COMMIT_MESSAGE:
ss.id = e->session_id;
ss.code = 250;
- if (queue_message_commit(evpid_to_msgid(e->id)))
+ msgid = evpid_to_msgid(e->id);
+ if (queue_message_commit(msgid)) {
stat_increment(e->flags & DF_ENQUEUED ?
STATS_QUEUE_LOCAL : STATS_QUEUE_REMOTE);
- else
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1,
+ &msgid, sizeof msgid);
+ } else
ss.code = 421;
- imsg_compose_event(iev, IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1,
- &ss, sizeof ss);
-
- if (ss.code != 421)
- queue_pass_to_scheduler(iev, imsg);
-
+ imsg_compose_event(iev, IMSG_QUEUE_COMMIT_MESSAGE, 0, 0,
+ -1, &ss, sizeof ss);
return;
case IMSG_QUEUE_MESSAGE_FILE:
@@ -96,28 +99,33 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg)
fd = queue_message_fd_rw(evpid_to_msgid(e->id));
if (fd == -1)
ss.code = 421;
- imsg_compose_event(iev, IMSG_QUEUE_MESSAGE_FILE, 0, 0, fd,
- &ss, sizeof ss);
+ imsg_compose_event(iev, IMSG_QUEUE_MESSAGE_FILE, 0, 0,
+ fd, &ss, sizeof ss);
return;
case IMSG_SMTP_ENQUEUE:
- queue_pass_to_scheduler(iev, imsg);
+ id = *(uint64_t*)(imsg->data);
+ queue_envelope_load(id, &evp);
+ bounce_session(imsg->fd, &evp);
return;
}
}
if (iev->proc == PROC_LKA) {
e = imsg->data;
-
switch (imsg->hdr.type) {
case IMSG_QUEUE_SUBMIT_ENVELOPE:
- ss.id = e->session_id;
- ret = queue_envelope_create(e);
- if (ret == 0) {
+ if (!queue_envelope_create(e)) {
+ ss.id = e->session_id;
ss.code = 421;
imsg_compose_event(env->sc_ievs[PROC_SMTP],
IMSG_QUEUE_TEMPFAIL, 0, 0, -1, &ss,
sizeof ss);
+ } else {
+ /* tell the scheduler */
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, e,
+ sizeof *e);
}
return;
@@ -132,14 +140,64 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg)
}
if (iev->proc == PROC_SCHEDULER) {
- /* forward imsgs from scheduler on its behalf */
- imsg_compose_event(env->sc_ievs[imsg->hdr.peerid], imsg->hdr.type,
- 0, imsg->hdr.pid, imsg->fd, (char *)imsg->data,
- imsg->hdr.len - sizeof imsg->hdr);
- return;
+ switch (imsg->hdr.type) {
+ case IMSG_QUEUE_REMOVE:
+ evp.id = *(uint64_t*)(imsg->data);
+ queue_envelope_delete(&evp);
+ return;
+
+ case IMSG_QUEUE_EXPIRE:
+ id = *(uint64_t*)(imsg->data);
+ queue_envelope_load(id, &evp);
+ envelope_set_errormsg(&evp, "envelope expired");
+ queue_bounce(&evp);
+ queue_envelope_delete(&evp);
+ return;
+
+ case IMSG_MDA_SESS_NEW:
+ id = *(uint64_t*)(imsg->data);
+ queue_envelope_load(id, &evp);
+ evp.lasttry = time(NULL);
+ fd = queue_message_fd_r(evpid_to_msgid(id));
+ imsg_compose_event(env->sc_ievs[PROC_MDA],
+ IMSG_MDA_SESS_NEW, 0, 0, fd, &evp, sizeof evp);
+ return;
+
+ case IMSG_SMTP_ENQUEUE:
+ id = *(uint64_t*)(imsg->data);
+ imsg_compose_event(env->sc_ievs[PROC_SMTP],
+ IMSG_SMTP_ENQUEUE, 0, 0, -1, &id, sizeof id);
+ return;
+
+ case IMSG_BATCH_CREATE:
+ bzero(&batch, sizeof batch);
+ return;
+
+ case IMSG_BATCH_APPEND:
+ id = *(uint64_t*)(imsg->data);
+ queue_envelope_load(id, &evp);
+ if (!batch.id) {
+ batch.id = generate_uid();
+ batch.relay = evp.agent.mta.relay;
+ imsg_compose_event(env->sc_ievs[PROC_MTA],
+ IMSG_BATCH_CREATE, 0, 0, -1,
+ &batch, sizeof batch);
+ }
+ evp.lasttry = time(NULL);
+ evp.batch_id = batch.id;
+ imsg_compose_event(env->sc_ievs[PROC_MTA],
+ IMSG_BATCH_APPEND, 0, 0, -1, &evp, sizeof evp);
+ return;
+
+ case IMSG_BATCH_CLOSE:
+ imsg_compose_event(env->sc_ievs[PROC_MTA],
+ IMSG_BATCH_CLOSE, 0, 0, -1,
+ &batch.id, sizeof batch.id);
+ return;
+ }
}
- if (iev->proc == PROC_MTA) {
+ if (iev->proc == PROC_MTA || iev->proc == PROC_MDA) {
switch (imsg->hdr.type) {
case IMSG_QUEUE_MESSAGE_FD:
mta_batch = imsg->data;
@@ -149,21 +207,29 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg)
return;
case IMSG_QUEUE_DELIVERY_OK:
- case IMSG_QUEUE_DELIVERY_TEMPFAIL:
- case IMSG_QUEUE_DELIVERY_PERMFAIL:
- case IMSG_BATCH_DONE:
- queue_pass_to_scheduler(iev, imsg);
+ e = imsg->data;
+ queue_envelope_delete(e);
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_QUEUE_DELIVERY_OK, 0, 0, -1, &e->id,
+ sizeof e->id);
return;
- }
- }
- if (iev->proc == PROC_MDA) {
- switch (imsg->hdr.type) {
- case IMSG_QUEUE_DELIVERY_OK:
case IMSG_QUEUE_DELIVERY_TEMPFAIL:
+ e = imsg->data;
+ e->retry++;
+ queue_envelope_update(e);
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_QUEUE_DELIVERY_TEMPFAIL, 0, 0, -1, e,
+ sizeof *e);
+ return;
+
case IMSG_QUEUE_DELIVERY_PERMFAIL:
- case IMSG_MDA_SESS_NEW:
- queue_pass_to_scheduler(iev, imsg);
+ e = imsg->data;
+ queue_bounce(e);
+ queue_envelope_delete(e);
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_QUEUE_DELIVERY_PERMFAIL, 0, 0, -1, &e->id,
+ sizeof e->id);
return;
}
}
@@ -174,7 +240,6 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg)
case IMSG_QUEUE_PAUSE_MTA:
case IMSG_QUEUE_RESUME_MDA:
case IMSG_QUEUE_RESUME_MTA:
- case IMSG_QUEUE_SCHEDULE:
case IMSG_QUEUE_REMOVE:
queue_pass_to_scheduler(iev, imsg);
return;
@@ -202,6 +267,36 @@ queue_pass_to_scheduler(struct imsgev *iev, struct imsg *imsg)
}
static void
+queue_bounce(struct envelope *e)
+{
+ struct envelope b;
+ uint32_t msgid;
+
+ b = *e;
+ b.type = D_BOUNCE;
+ b.retry = 0;
+ b.lasttry = 0;
+ b.creation = time(NULL);
+ b.expire = 3600 * 24 * 7;
+
+ if (e->type == D_BOUNCE) {
+ log_warnx("queue: double bounce!");
+ } else if (e->sender.user[0] == '\0') {
+ log_warnx("queue: no return path!");
+ } else if (!queue_envelope_create(&b)) {
+ log_warnx("queue: cannot bounce!");
+ } else {
+ log_debug("queue: bouncing evp:%016" PRIx64
+ " as evp:%016" PRIx64, e->id, b.id);
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, &b, sizeof b);
+ msgid = evpid_to_msgid(b.id);
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &msgid, sizeof msgid);
+ }
+}
+
+static void
queue_sig_handler(int sig, short event, void *p)
{
switch (sig) {
@@ -302,12 +397,12 @@ queue(void)
return (0);
}
-void
+static void
queue_timeout(int fd, short event, void *p)
{
static struct qwalk *q = NULL;
+ static uint32_t last_msgid = 0;
struct event *ev = p;
- static uint64_t last_evpid = 0;
struct envelope envelope;
struct timeval tv;
uint64_t evpid;
@@ -318,47 +413,29 @@ queue_timeout(int fd, short event, void *p)
}
while (qwalk(q, &evpid)) {
- if (! queue_envelope_load(evpid, &envelope))
- continue;
-
- if (evpid_to_msgid(evpid) != evpid_to_msgid(last_evpid) &&
- last_evpid != 0) {
- envelope.id = last_evpid;
+ if (queue_envelope_load(evpid, &envelope))
imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
- IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &envelope,
+ IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, &envelope,
sizeof envelope);
- }
- last_evpid = evpid;
+ if (last_msgid && evpid_to_msgid(evpid) != last_msgid)
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &last_msgid,
+ sizeof last_msgid);
+
+ last_msgid = evpid_to_msgid(evpid);
tv.tv_sec = 0;
tv.tv_usec = 0;
evtimer_add(ev, &tv);
return;
}
- if (last_evpid) {
- envelope.id = last_evpid;
+ if (last_msgid) {
imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
- IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &envelope,
- sizeof envelope);
+ IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &last_msgid,
+ sizeof last_msgid);
}
- log_info("queue: done loading queue into scheduler");
+ log_debug("queue: done loading queue into scheduler");
qwalk_close(q);
}
-
-void
-queue_submit_envelope(struct envelope *ep)
-{
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1,
- ep, sizeof(*ep));
-}
-
-void
-queue_commit_envelopes(struct envelope *ep)
-{
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_QUEUE_COMMIT_ENVELOPES, 0, 0, -1,
- ep, sizeof(*ep));
-}
diff --git a/usr.sbin/smtpd/scheduler.c b/usr.sbin/smtpd/scheduler.c
index 9efaf8029a4..8dbf95d6880 100644
--- a/usr.sbin/smtpd/scheduler.c
+++ b/usr.sbin/smtpd/scheduler.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: scheduler.c,v 1.8 2012/08/08 08:50:42 eric Exp $ */
+/* $OpenBSD: scheduler.c,v 1.9 2012/08/09 09:48:02 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -57,9 +57,6 @@ static void scheduler_process_mda(struct scheduler_batch *);
static void scheduler_process_mta(struct scheduler_batch *);
static int scheduler_load_message(u_int32_t);
-void scheduler_envelope_update(struct envelope *);
-void scheduler_envelope_delete(struct envelope *);
-
static struct scheduler_backend *backend = NULL;
extern const char *backend_scheduler;
@@ -67,127 +64,104 @@ extern const char *backend_scheduler;
void
scheduler_imsg(struct imsgev *iev, struct imsg *imsg)
{
- struct envelope *e, bounce;
- struct scheduler_info si;
+ struct envelope *e;
+ struct scheduler_info si;
+ uint64_t id;
+ uint32_t msgid;
log_imsg(PROC_SCHEDULER, iev->proc, imsg);
switch (imsg->hdr.type) {
- case IMSG_QUEUE_COMMIT_MESSAGE:
+
+ case IMSG_QUEUE_SUBMIT_ENVELOPE:
e = imsg->data;
log_trace(TRACE_SCHEDULER,
- "scheduler: IMSG_QUEUE_COMMIT_MESSAGE: %016"PRIx64, e->id);
- scheduler_load_message(evpid_to_msgid(e->id));
+ "scheduler: inserting evp:%016" PRIx64, e->id);
+ scheduler_info(&si, e);
+ backend->insert(&si);
+ return;
+
+ case IMSG_QUEUE_COMMIT_MESSAGE:
+ msgid = *(uint32_t *)(imsg->data);
+ log_trace(TRACE_SCHEDULER,
+ "scheduler: commiting msg:%08" PRIx32, msgid);
+ backend->commit(msgid);
+ scheduler_reset_events();
+ return;
+
+ case IMSG_QUEUE_TEMPFAIL:
+ msgid = *(uint32_t *)(imsg->data);
+ log_trace(TRACE_SCHEDULER, "scheduler: aborting msg:%08" PRIx32,
+ msgid);
+ backend->rollback(msgid);
scheduler_reset_events();
return;
case IMSG_QUEUE_DELIVERY_OK:
- stat_decrement(STATS_SCHEDULER);
- e = imsg->data;
+ id = *(uint64_t *)(imsg->data);
log_trace(TRACE_SCHEDULER,
- "scheduler: IMSG_QUEUE_DELIVERY_OK: %016"PRIx64, e->id);
- backend->delete(e->id);
- queue_envelope_delete(e);
+ "scheduler: deleting evp:%016" PRIx64 " (ok)", id);
+ backend->delete(id);
scheduler_reset_events();
return;
case IMSG_QUEUE_DELIVERY_TEMPFAIL:
- stat_decrement(STATS_SCHEDULER);
e = imsg->data;
log_trace(TRACE_SCHEDULER,
- "scheduler: IMSG_QUEUE_DELIVERY_TEMPFAIL: %016"PRIx64, e->id);
- e->retry++;
- queue_envelope_update(e);
+ "scheduler: updating evp:%016" PRIx64, e->id);
scheduler_info(&si, e);
backend->update(&si);
scheduler_reset_events();
return;
case IMSG_QUEUE_DELIVERY_PERMFAIL:
- stat_decrement(STATS_SCHEDULER);
- e = imsg->data;
+ id = *(uint64_t *)(imsg->data);
log_trace(TRACE_SCHEDULER,
- "scheduler: IMSG_QUEUE_DELIVERY_PERMFAIL: %016"PRIx64, e->id);
- if (e->type != D_BOUNCE && e->sender.user[0] != '\0') {
- bounce_record_message(e, &bounce);
- scheduler_info(&si, &bounce);
- backend->insert(&si);
- backend->commit(evpid_to_msgid(bounce.id));
- }
- backend->delete(e->id);
- queue_envelope_delete(e);
- scheduler_reset_events();
- return;
-
- case IMSG_MDA_SESS_NEW:
- log_trace(TRACE_SCHEDULER, "scheduler: IMSG_MDA_SESS_NEW");
- stat_decrement(STATS_MDA_SESSION);
- if (env->sc_maxconn - stat_get(STATS_MDA_SESSION, STAT_ACTIVE))
- env->sc_flags &= ~SMTPD_MDA_BUSY;
+ "scheduler: deleting evp:%016" PRIx64 " (fail)", id);
+ backend->delete(id);
scheduler_reset_events();
return;
- case IMSG_BATCH_DONE:
- log_trace(TRACE_SCHEDULER, "scheduler: IMSG_BATCH_DONE");
- stat_decrement(STATS_MTA_SESSION);
- if (env->sc_maxconn - stat_get(STATS_MTA_SESSION, STAT_ACTIVE))
- env->sc_flags &= ~SMTPD_MTA_BUSY;
- scheduler_reset_events();
- return;
-
- case IMSG_SMTP_ENQUEUE:
- e = imsg->data;
- log_trace(TRACE_SCHEDULER,
- "scheduler: IMSG_SMTP_ENQUEUE: %016"PRIx64, e->id);
- if (imsg->fd < 0 || !bounce_session(imsg->fd, e)) {
- queue_envelope_update(e);
- scheduler_info(&si, e);
- backend->update(&si);
- scheduler_reset_events();
- return;
- }
- return;
-
case IMSG_QUEUE_PAUSE_MDA:
- log_trace(TRACE_SCHEDULER, "scheduler: IMSG_QUEUE_PAUSE_MDA");
+ log_trace(TRACE_SCHEDULER, "scheduler: pausing mda");
env->sc_flags |= SMTPD_MDA_PAUSED;
return;
case IMSG_QUEUE_RESUME_MDA:
- log_trace(TRACE_SCHEDULER, "scheduler: IMSG_QUEUE_RESUME_MDA");
+ log_trace(TRACE_SCHEDULER, "scheduler: resuming mda");
env->sc_flags &= ~SMTPD_MDA_PAUSED;
scheduler_reset_events();
return;
case IMSG_QUEUE_PAUSE_MTA:
- log_trace(TRACE_SCHEDULER, "scheduler: IMSG_QUEUE_PAUSE_MTA");
+ log_trace(TRACE_SCHEDULER, "scheduler: pausing mta");
env->sc_flags |= SMTPD_MTA_PAUSED;
return;
case IMSG_QUEUE_RESUME_MTA:
- log_trace(TRACE_SCHEDULER, "scheduler: IMSG_QUEUE_RESUME_MTA");
+ log_trace(TRACE_SCHEDULER, "scheduler: resuming mta");
env->sc_flags &= ~SMTPD_MTA_PAUSED;
scheduler_reset_events();
return;
case IMSG_CTL_VERBOSE:
- log_trace(TRACE_SCHEDULER, "scheduler: IMSG_CTL_VERBOSE");
log_verbose(*(int *)imsg->data);
return;
case IMSG_SCHEDULER_SCHEDULE:
- log_trace(TRACE_SCHEDULER,
- "scheduler: IMSG_SCHEDULER_SCHEDULE: %016"PRIx64,
- *(u_int64_t *)imsg->data);
- backend->schedule(*(u_int64_t *)imsg->data);
+ id = *(uint64_t *)(imsg->data);
+ log_debug("scheduler: scheduling evp:%016" PRIx64, id);
+ backend->schedule(id);
scheduler_reset_events();
return;
case IMSG_SCHEDULER_REMOVE:
- log_trace(TRACE_SCHEDULER,
- "scheduler: IMSG_SCHEDULER_REMOVE: %016"PRIx64,
- *(u_int64_t *)imsg->data);
- backend->remove(*(u_int64_t *)imsg->data);
+ id = *(uint64_t *)(imsg->data);
+ if (id <= 0xffffffffL)
+ log_debug("scheduler: removing msg:%08" PRIx64, id);
+ else
+ log_debug("scheduler: removing evp:%016" PRIx64, id);
+ backend->remove(id);
scheduler_reset_events();
return;
}
@@ -358,15 +332,15 @@ scheduler_timeout(int fd, short event, void *p)
static void
scheduler_process_remove(struct scheduler_batch *batch)
{
- struct envelope evp;
struct id_list *e;
while ((e = batch->evpids)) {
batch->evpids = e->next;
- log_debug("scheduler: deleting evp:%016" PRIx64 " (removed)",
+ log_debug("scheduler: evp:%016" PRIx64 " removed",
e->id);
- evp.id = e->id;
- queue_envelope_delete(&evp);
+ backend->delete(e->id);
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_QUEUE_REMOVE,
+ 0, 0, -1, &e->id, sizeof e->id);
free(e);
}
}
@@ -374,15 +348,15 @@ scheduler_process_remove(struct scheduler_batch *batch)
static void
scheduler_process_expire(struct scheduler_batch *batch)
{
- struct envelope evp;
struct id_list *e;
while ((e = batch->evpids)) {
batch->evpids = e->next;
- log_debug("scheduler: deleting evp:%016" PRIx64 " (expire)",
+ log_debug("scheduler: evp:%016" PRIx64 " expired",
e->id);
- evp.id = e->id;
- queue_envelope_delete(&evp);
+ backend->delete(e->id);
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_QUEUE_EXPIRE,
+ 0, 0, -1, &e->id, sizeof e->id);
free(e);
}
}
@@ -390,20 +364,14 @@ scheduler_process_expire(struct scheduler_batch *batch)
static void
scheduler_process_bounce(struct scheduler_batch *batch)
{
- struct envelope evp;
struct id_list *e;
while ((e = batch->evpids)) {
batch->evpids = e->next;
- log_debug("scheduler: scheduling evp:%016" PRIx64 " (bounce)",
+ log_debug("scheduler: evp:%016" PRIx64 " scheduled (bounce)",
e->id);
- queue_envelope_load(e->id, &evp);
- evp.lasttry = time(NULL);
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_SMTP_ENQUEUE, PROC_SMTP, 0, -1, &evp,
- sizeof evp);
- stat_increment(STATS_SCHEDULER);
- stat_increment(STATS_SCHEDULER_BOUNCES);
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_SMTP_ENQUEUE,
+ 0, 0, -1, &e->id, sizeof e->id);
free(e);
}
}
@@ -411,22 +379,14 @@ scheduler_process_bounce(struct scheduler_batch *batch)
static void
scheduler_process_mda(struct scheduler_batch *batch)
{
- struct envelope evp;
struct id_list *e;
- int fd;
while ((e = batch->evpids)) {
batch->evpids = e->next;
- log_debug("scheduler: scheduling evp:%016" PRIx64 " (mda)",
+ log_debug("scheduler: evp:%016" PRIx64 " scheduled (mda)",
e->id);
- queue_envelope_load(e->id, &evp);
- evp.lasttry = time(NULL);
- fd = queue_message_fd_r(evpid_to_msgid(evp.id));
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_MDA_SESS_NEW, PROC_MDA, 0, fd, &evp,
- sizeof evp);
- stat_increment(STATS_SCHEDULER);
- stat_increment(STATS_MDA_SESSION);
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_MDA_SESS_NEW,
+ 0, 0, -1, &e->id, sizeof e->id);
free(e);
}
}
@@ -434,73 +394,22 @@ scheduler_process_mda(struct scheduler_batch *batch)
static void
scheduler_process_mta(struct scheduler_batch *batch)
{
- struct envelope evp;
- struct mta_batch mta_batch;
struct id_list *e;
- queue_envelope_load(batch->evpids->id, &evp);
-
- bzero(&mta_batch, sizeof mta_batch);
- mta_batch.id = arc4random();
- mta_batch.relay = evp.agent.mta.relay;
-
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_BATCH_CREATE, PROC_MTA, 0, -1, &mta_batch,
- sizeof mta_batch);
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_BATCH_CREATE,
+ 0, 0, -1, NULL, 0);
while ((e = batch->evpids)) {
batch->evpids = e->next;
- log_debug("scheduler: scheduling evp:%016" PRIx64 " (mta)",
+ log_debug("scheduler: evp:%016" PRIx64 " scheduled (mta)",
e->id);
- queue_envelope_load(e->id, &evp);
- evp.lasttry = time(NULL);
- evp.batch_id = mta_batch.id;
imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_BATCH_APPEND,
- PROC_MTA, 0, -1, &evp, sizeof evp);
+ 0, 0, -1, &e->id, sizeof e->id);
free(e);
- stat_increment(STATS_SCHEDULER);
}
imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_BATCH_CLOSE,
- PROC_MTA, 0, -1, &mta_batch, sizeof mta_batch);
+ 0, 0, -1, NULL, 0);
stat_increment(STATS_MTA_SESSION);
}
-
-void
-scheduler_envelope_update(struct envelope *e)
-{
- struct scheduler_info si;
-
- scheduler_info(&si, e);
- backend->update(&si);
- scheduler_reset_events();
-}
-
-void
-scheduler_envelope_delete(struct envelope *e)
-{
- backend->delete(e->id);
- scheduler_reset_events();
-}
-
-static int
-scheduler_load_message(u_int32_t msgid)
-{
- struct qwalk *q;
- u_int64_t evpid;
- struct envelope envelope;
- struct scheduler_info si;
-
- q = qwalk_new(msgid);
- while (qwalk(q, &evpid)) {
- if (! queue_envelope_load(evpid, &envelope))
- continue;
- scheduler_info(&si, &envelope);
- backend->insert(&si);
- }
- qwalk_close(q);
- backend->commit(msgid);
-
- return 1;
-}
diff --git a/usr.sbin/smtpd/smtp.c b/usr.sbin/smtpd/smtp.c
index fc0871c20cb..a9b217df20e 100644
--- a/usr.sbin/smtpd/smtp.c
+++ b/usr.sbin/smtpd/smtp.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: smtp.c,v 1.102 2012/06/01 14:55:09 eric Exp $ */
+/* $OpenBSD: smtp.c,v 1.103 2012/08/09 09:48:02 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -154,7 +154,7 @@ smtp_imsg(struct imsgev *iev, struct imsg *imsg)
case IMSG_SMTP_ENQUEUE:
imsg_compose_event(iev, IMSG_SMTP_ENQUEUE, 0, 0,
smtp_enqueue(NULL), imsg->data,
- sizeof(struct envelope));
+ imsg->hdr.len - sizeof imsg->hdr);
return;
}
}
diff --git a/usr.sbin/smtpd/smtpd.c b/usr.sbin/smtpd/smtpd.c
index 7fbbe7045fe..28cf891ed98 100644
--- a/usr.sbin/smtpd/smtpd.c
+++ b/usr.sbin/smtpd/smtpd.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: smtpd.c,v 1.156 2012/08/08 17:28:36 eric Exp $ */
+/* $OpenBSD: smtpd.c,v 1.157 2012/08/09 09:48:02 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -1248,8 +1248,8 @@ imsg_to_str(int type)
CASE(IMSG_QUEUE_MESSAGE_FD);
CASE(IMSG_QUEUE_MESSAGE_FILE);
- CASE(IMSG_QUEUE_SCHEDULE);
CASE(IMSG_QUEUE_REMOVE);
+ CASE(IMSG_QUEUE_EXPIRE);
CASE(IMSG_SCHEDULER_REMOVE);
CASE(IMSG_SCHEDULER_SCHEDULE);
@@ -1257,7 +1257,6 @@ imsg_to_str(int type)
CASE(IMSG_BATCH_CREATE);
CASE(IMSG_BATCH_APPEND);
CASE(IMSG_BATCH_CLOSE);
- CASE(IMSG_BATCH_DONE);
CASE(IMSG_PARENT_FORWARD_OPEN);
CASE(IMSG_PARENT_FORK_MDA);
diff --git a/usr.sbin/smtpd/smtpd.h b/usr.sbin/smtpd/smtpd.h
index 62f6ae20cfe..688a9a59b4a 100644
--- a/usr.sbin/smtpd/smtpd.h
+++ b/usr.sbin/smtpd/smtpd.h
@@ -1,4 +1,4 @@
-/* $OpenBSD: smtpd.h,v 1.316 2012/08/08 17:33:55 eric Exp $ */
+/* $OpenBSD: smtpd.h,v 1.317 2012/08/09 09:48:02 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -156,8 +156,8 @@ enum imsg_type {
IMSG_QUEUE_DELIVERY_PERMFAIL,
IMSG_QUEUE_MESSAGE_FD,
IMSG_QUEUE_MESSAGE_FILE,
- IMSG_QUEUE_SCHEDULE,
IMSG_QUEUE_REMOVE,
+ IMSG_QUEUE_EXPIRE,
IMSG_SCHEDULER_REMOVE,
IMSG_SCHEDULER_SCHEDULE,
@@ -165,7 +165,6 @@ enum imsg_type {
IMSG_BATCH_CREATE,
IMSG_BATCH_APPEND,
IMSG_BATCH_CLOSE,
- IMSG_BATCH_DONE,
IMSG_PARENT_FORWARD_OPEN,
IMSG_PARENT_FORK_MDA,
@@ -324,7 +323,6 @@ struct mailaddr {
};
enum delivery_type {
- D_INVALID = 0,
D_MDA,
D_MTA,
D_BOUNCE
@@ -1077,8 +1075,6 @@ int cmdline_symset(char *);
/* queue.c */
pid_t queue(void);
-void queue_submit_envelope(struct envelope *);
-void queue_commit_envelopes(struct envelope *);
/* queue_backend.c */