diff options
-rw-r--r-- | usr.sbin/smtpd/bounce.c | 43 | ||||
-rw-r--r-- | usr.sbin/smtpd/lka_session.c | 9 | ||||
-rw-r--r-- | usr.sbin/smtpd/mda.c | 6 | ||||
-rw-r--r-- | usr.sbin/smtpd/mta_session.c | 5 | ||||
-rw-r--r-- | usr.sbin/smtpd/queue.c | 217 | ||||
-rw-r--r-- | usr.sbin/smtpd/scheduler.c | 219 | ||||
-rw-r--r-- | usr.sbin/smtpd/smtp.c | 4 | ||||
-rw-r--r-- | usr.sbin/smtpd/smtpd.c | 5 | ||||
-rw-r--r-- | usr.sbin/smtpd/smtpd.h | 8 |
9 files changed, 248 insertions, 268 deletions
diff --git a/usr.sbin/smtpd/bounce.c b/usr.sbin/smtpd/bounce.c index f31b322ef29..c0248cfb3e7 100644 --- a/usr.sbin/smtpd/bounce.c +++ b/usr.sbin/smtpd/bounce.c @@ -1,4 +1,4 @@ -/* $OpenBSD: bounce.c,v 1.43 2012/08/08 08:50:42 eric Exp $ */ +/* $OpenBSD: bounce.c,v 1.44 2012/08/09 09:48:02 eric Exp $ */ /* * Copyright (c) 2009 Gilles Chehade <gilles@openbsd.org> @@ -60,10 +60,6 @@ struct bounce { struct io io; }; -/* XXX remove later */ -void scheduler_envelope_update(struct envelope *); -void scheduler_envelope_delete(struct envelope *); - static void bounce_send(struct bounce *, const char *, ...); static int bounce_next(struct bounce *); static void bounce_status(struct bounce *, const char *, ...); @@ -197,9 +193,10 @@ bounce_next(struct bounce *bounce) static void bounce_status(struct bounce *bounce, const char *fmt, ...) { - va_list ap; - char *status; - int len; + va_list ap; + char *status; + int len, msg; + struct envelope *evp; /* ignore if the envelope has already been updated/deleted */ if (bounce->evp.id == 0) @@ -210,17 +207,26 @@ bounce_status(struct bounce *bounce, const char *fmt, ...) fatal("bounce: vasprintf"); va_end(ap); - if (*status == '2' || *status == '5' || *status == '6') { - log_debug("#### %s: queue_envelope_delete: %016" PRIx64, - __func__, bounce->evp.id); - queue_envelope_delete(&bounce->evp); - scheduler_envelope_delete(&bounce->evp); + if (*status == '2') + msg = IMSG_QUEUE_DELIVERY_OK; + else if (*status == '5' || *status == '6') + msg = IMSG_QUEUE_DELIVERY_PERMFAIL; + else + msg = IMSG_QUEUE_DELIVERY_TEMPFAIL; + + evp = &bounce->evp; + if (msg == IMSG_QUEUE_DELIVERY_TEMPFAIL) { + evp->retry++; + envelope_set_errormsg(evp, "%s", status); + queue_envelope_update(evp); + imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], msg, 0, 0, -1, + evp, sizeof *evp); } else { - bounce->evp.retry++; - envelope_set_errormsg(&bounce->evp, "%s", status); - queue_envelope_update(&bounce->evp); - scheduler_envelope_update(&bounce->evp); + queue_envelope_delete(evp); + imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], msg, 0, 0, -1, + &evp->id, sizeof evp->id); } + bounce->evp.id = 0; free(status); } @@ -234,9 +240,6 @@ bounce_free(struct bounce *bounce) iobuf_clear(&bounce->iobuf); io_clear(&bounce->io); free(bounce); - - stat_decrement(STATS_SCHEDULER); - stat_decrement(STATS_SCHEDULER_BOUNCES); } static void diff --git a/usr.sbin/smtpd/lka_session.c b/usr.sbin/smtpd/lka_session.c index f0eec8e037e..c85a8572e1d 100644 --- a/usr.sbin/smtpd/lka_session.c +++ b/usr.sbin/smtpd/lka_session.c @@ -1,4 +1,4 @@ -/* $OpenBSD: lka_session.c,v 1.19 2012/07/29 17:21:43 gilles Exp $ */ +/* $OpenBSD: lka_session.c,v 1.20 2012/08/09 09:48:02 eric Exp $ */ /* * Copyright (c) 2011 Gilles Chehade <gilles@openbsd.org> @@ -310,11 +310,14 @@ lka_session_done(struct lka_session *lks) /* process the delivery list and submit envelopes to queue */ while ((ep = TAILQ_FIRST(&lks->deliverylist)) != NULL) { - queue_submit_envelope(ep); + imsg_compose_event(env->sc_ievs[PROC_QUEUE], + IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, ep, sizeof *ep); TAILQ_REMOVE(&lks->deliverylist, ep, entry); free(ep); } - queue_commit_envelopes(&lks->ss.envelope); + ep = &lks->ss.envelope; + imsg_compose_event(env->sc_ievs[PROC_QUEUE], + IMSG_QUEUE_COMMIT_ENVELOPES, 0, 0, -1, ep, sizeof *ep); done: if (lks->flags & F_ERROR) { diff --git a/usr.sbin/smtpd/mda.c b/usr.sbin/smtpd/mda.c index 8ebe76d607a..df5e20fca29 100644 --- a/usr.sbin/smtpd/mda.c +++ b/usr.sbin/smtpd/mda.c @@ -1,4 +1,4 @@ -/* $OpenBSD: mda.c,v 1.67 2012/01/13 14:01:57 eric Exp $ */ +/* $OpenBSD: mda.c,v 1.68 2012/08/09 09:48:02 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> @@ -236,10 +236,6 @@ mda_imsg(struct imsgev *iev, struct imsg *imsg) msgbuf_clear(&s->w); event_del(&s->ev); free(s); - - /* update queue's session count */ - imsg_compose_event(env->sc_ievs[PROC_QUEUE], - IMSG_MDA_SESS_NEW, 0, 0, -1, NULL, 0); return; case IMSG_CTL_VERBOSE: diff --git a/usr.sbin/smtpd/mta_session.c b/usr.sbin/smtpd/mta_session.c index d99a08f8ee3..6345ae24d87 100644 --- a/usr.sbin/smtpd/mta_session.c +++ b/usr.sbin/smtpd/mta_session.c @@ -1,4 +1,4 @@ -/* $OpenBSD: mta_session.c,v 1.6 2012/07/29 20:16:02 eric Exp $ */ +/* $OpenBSD: mta_session.c,v 1.7 2012/08/09 09:48:02 eric Exp $ */ /* * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> @@ -439,9 +439,6 @@ mta_enter_state(struct mta_session *s, int newstate) if (TAILQ_FIRST(&s->tasks)) fatalx("all tasks should have been deleted already"); - imsg_compose_event(env->sc_ievs[PROC_QUEUE], - IMSG_BATCH_DONE, 0, 0, -1, NULL, 0); - /* deallocate resources */ SPLAY_REMOVE(mta_session_tree, &mta_sessions, s); while ((relay = TAILQ_FIRST(&s->relays))) { diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c index c8c2d31403c..e51f9532422 100644 --- a/usr.sbin/smtpd/queue.c +++ b/usr.sbin/smtpd/queue.c @@ -1,4 +1,4 @@ -/* $OpenBSD: queue.c,v 1.122 2012/08/08 08:50:42 eric Exp $ */ +/* $OpenBSD: queue.c,v 1.123 2012/08/09 09:48:02 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> @@ -41,6 +41,7 @@ static void queue_imsg(struct imsgev *, struct imsg *); static void queue_timeout(int, short, void *); +static void queue_bounce(struct envelope *); static void queue_pass_to_scheduler(struct imsgev *, struct imsg *); static void queue_shutdown(void); static void queue_sig_handler(int, short, void *); @@ -48,10 +49,12 @@ static void queue_sig_handler(int, short, void *); static void queue_imsg(struct imsgev *iev, struct imsg *imsg) { + static struct mta_batch batch, *mta_batch; struct submit_status ss; - struct envelope *e; - struct mta_batch *mta_batch; + struct envelope *e, evp; int fd, ret; + uint64_t id; + uint32_t msgid; log_imsg(PROC_QUEUE, iev->proc, imsg); @@ -66,8 +69,8 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg) ret = queue_message_create(&ss.u.msgid); if (ret == 0) ss.code = 421; - imsg_compose_event(iev, IMSG_QUEUE_CREATE_MESSAGE, 0, 0, -1, - &ss, sizeof ss); + imsg_compose_event(iev, IMSG_QUEUE_CREATE_MESSAGE, 0, 0, + -1, &ss, sizeof ss); return; case IMSG_QUEUE_REMOVE_MESSAGE: @@ -77,18 +80,18 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg) case IMSG_QUEUE_COMMIT_MESSAGE: ss.id = e->session_id; ss.code = 250; - if (queue_message_commit(evpid_to_msgid(e->id))) + msgid = evpid_to_msgid(e->id); + if (queue_message_commit(msgid)) { stat_increment(e->flags & DF_ENQUEUED ? STATS_QUEUE_LOCAL : STATS_QUEUE_REMOTE); - else + imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], + IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, + &msgid, sizeof msgid); + } else ss.code = 421; - imsg_compose_event(iev, IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, - &ss, sizeof ss); - - if (ss.code != 421) - queue_pass_to_scheduler(iev, imsg); - + imsg_compose_event(iev, IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, + -1, &ss, sizeof ss); return; case IMSG_QUEUE_MESSAGE_FILE: @@ -96,28 +99,33 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg) fd = queue_message_fd_rw(evpid_to_msgid(e->id)); if (fd == -1) ss.code = 421; - imsg_compose_event(iev, IMSG_QUEUE_MESSAGE_FILE, 0, 0, fd, - &ss, sizeof ss); + imsg_compose_event(iev, IMSG_QUEUE_MESSAGE_FILE, 0, 0, + fd, &ss, sizeof ss); return; case IMSG_SMTP_ENQUEUE: - queue_pass_to_scheduler(iev, imsg); + id = *(uint64_t*)(imsg->data); + queue_envelope_load(id, &evp); + bounce_session(imsg->fd, &evp); return; } } if (iev->proc == PROC_LKA) { e = imsg->data; - switch (imsg->hdr.type) { case IMSG_QUEUE_SUBMIT_ENVELOPE: - ss.id = e->session_id; - ret = queue_envelope_create(e); - if (ret == 0) { + if (!queue_envelope_create(e)) { + ss.id = e->session_id; ss.code = 421; imsg_compose_event(env->sc_ievs[PROC_SMTP], IMSG_QUEUE_TEMPFAIL, 0, 0, -1, &ss, sizeof ss); + } else { + /* tell the scheduler */ + imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], + IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, e, + sizeof *e); } return; @@ -132,14 +140,64 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg) } if (iev->proc == PROC_SCHEDULER) { - /* forward imsgs from scheduler on its behalf */ - imsg_compose_event(env->sc_ievs[imsg->hdr.peerid], imsg->hdr.type, - 0, imsg->hdr.pid, imsg->fd, (char *)imsg->data, - imsg->hdr.len - sizeof imsg->hdr); - return; + switch (imsg->hdr.type) { + case IMSG_QUEUE_REMOVE: + evp.id = *(uint64_t*)(imsg->data); + queue_envelope_delete(&evp); + return; + + case IMSG_QUEUE_EXPIRE: + id = *(uint64_t*)(imsg->data); + queue_envelope_load(id, &evp); + envelope_set_errormsg(&evp, "envelope expired"); + queue_bounce(&evp); + queue_envelope_delete(&evp); + return; + + case IMSG_MDA_SESS_NEW: + id = *(uint64_t*)(imsg->data); + queue_envelope_load(id, &evp); + evp.lasttry = time(NULL); + fd = queue_message_fd_r(evpid_to_msgid(id)); + imsg_compose_event(env->sc_ievs[PROC_MDA], + IMSG_MDA_SESS_NEW, 0, 0, fd, &evp, sizeof evp); + return; + + case IMSG_SMTP_ENQUEUE: + id = *(uint64_t*)(imsg->data); + imsg_compose_event(env->sc_ievs[PROC_SMTP], + IMSG_SMTP_ENQUEUE, 0, 0, -1, &id, sizeof id); + return; + + case IMSG_BATCH_CREATE: + bzero(&batch, sizeof batch); + return; + + case IMSG_BATCH_APPEND: + id = *(uint64_t*)(imsg->data); + queue_envelope_load(id, &evp); + if (!batch.id) { + batch.id = generate_uid(); + batch.relay = evp.agent.mta.relay; + imsg_compose_event(env->sc_ievs[PROC_MTA], + IMSG_BATCH_CREATE, 0, 0, -1, + &batch, sizeof batch); + } + evp.lasttry = time(NULL); + evp.batch_id = batch.id; + imsg_compose_event(env->sc_ievs[PROC_MTA], + IMSG_BATCH_APPEND, 0, 0, -1, &evp, sizeof evp); + return; + + case IMSG_BATCH_CLOSE: + imsg_compose_event(env->sc_ievs[PROC_MTA], + IMSG_BATCH_CLOSE, 0, 0, -1, + &batch.id, sizeof batch.id); + return; + } } - if (iev->proc == PROC_MTA) { + if (iev->proc == PROC_MTA || iev->proc == PROC_MDA) { switch (imsg->hdr.type) { case IMSG_QUEUE_MESSAGE_FD: mta_batch = imsg->data; @@ -149,21 +207,29 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg) return; case IMSG_QUEUE_DELIVERY_OK: - case IMSG_QUEUE_DELIVERY_TEMPFAIL: - case IMSG_QUEUE_DELIVERY_PERMFAIL: - case IMSG_BATCH_DONE: - queue_pass_to_scheduler(iev, imsg); + e = imsg->data; + queue_envelope_delete(e); + imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], + IMSG_QUEUE_DELIVERY_OK, 0, 0, -1, &e->id, + sizeof e->id); return; - } - } - if (iev->proc == PROC_MDA) { - switch (imsg->hdr.type) { - case IMSG_QUEUE_DELIVERY_OK: case IMSG_QUEUE_DELIVERY_TEMPFAIL: + e = imsg->data; + e->retry++; + queue_envelope_update(e); + imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], + IMSG_QUEUE_DELIVERY_TEMPFAIL, 0, 0, -1, e, + sizeof *e); + return; + case IMSG_QUEUE_DELIVERY_PERMFAIL: - case IMSG_MDA_SESS_NEW: - queue_pass_to_scheduler(iev, imsg); + e = imsg->data; + queue_bounce(e); + queue_envelope_delete(e); + imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], + IMSG_QUEUE_DELIVERY_PERMFAIL, 0, 0, -1, &e->id, + sizeof e->id); return; } } @@ -174,7 +240,6 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg) case IMSG_QUEUE_PAUSE_MTA: case IMSG_QUEUE_RESUME_MDA: case IMSG_QUEUE_RESUME_MTA: - case IMSG_QUEUE_SCHEDULE: case IMSG_QUEUE_REMOVE: queue_pass_to_scheduler(iev, imsg); return; @@ -202,6 +267,36 @@ queue_pass_to_scheduler(struct imsgev *iev, struct imsg *imsg) } static void +queue_bounce(struct envelope *e) +{ + struct envelope b; + uint32_t msgid; + + b = *e; + b.type = D_BOUNCE; + b.retry = 0; + b.lasttry = 0; + b.creation = time(NULL); + b.expire = 3600 * 24 * 7; + + if (e->type == D_BOUNCE) { + log_warnx("queue: double bounce!"); + } else if (e->sender.user[0] == '\0') { + log_warnx("queue: no return path!"); + } else if (!queue_envelope_create(&b)) { + log_warnx("queue: cannot bounce!"); + } else { + log_debug("queue: bouncing evp:%016" PRIx64 + " as evp:%016" PRIx64, e->id, b.id); + imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], + IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, &b, sizeof b); + msgid = evpid_to_msgid(b.id); + imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], + IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &msgid, sizeof msgid); + } +} + +static void queue_sig_handler(int sig, short event, void *p) { switch (sig) { @@ -302,12 +397,12 @@ queue(void) return (0); } -void +static void queue_timeout(int fd, short event, void *p) { static struct qwalk *q = NULL; + static uint32_t last_msgid = 0; struct event *ev = p; - static uint64_t last_evpid = 0; struct envelope envelope; struct timeval tv; uint64_t evpid; @@ -318,47 +413,29 @@ queue_timeout(int fd, short event, void *p) } while (qwalk(q, &evpid)) { - if (! queue_envelope_load(evpid, &envelope)) - continue; - - if (evpid_to_msgid(evpid) != evpid_to_msgid(last_evpid) && - last_evpid != 0) { - envelope.id = last_evpid; + if (queue_envelope_load(evpid, &envelope)) imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], - IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &envelope, + IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, &envelope, sizeof envelope); - } - last_evpid = evpid; + if (last_msgid && evpid_to_msgid(evpid) != last_msgid) + imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], + IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &last_msgid, + sizeof last_msgid); + + last_msgid = evpid_to_msgid(evpid); tv.tv_sec = 0; tv.tv_usec = 0; evtimer_add(ev, &tv); return; } - if (last_evpid) { - envelope.id = last_evpid; + if (last_msgid) { imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], - IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &envelope, - sizeof envelope); + IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &last_msgid, + sizeof last_msgid); } - log_info("queue: done loading queue into scheduler"); + log_debug("queue: done loading queue into scheduler"); qwalk_close(q); } - -void -queue_submit_envelope(struct envelope *ep) -{ - imsg_compose_event(env->sc_ievs[PROC_QUEUE], - IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, - ep, sizeof(*ep)); -} - -void -queue_commit_envelopes(struct envelope *ep) -{ - imsg_compose_event(env->sc_ievs[PROC_QUEUE], - IMSG_QUEUE_COMMIT_ENVELOPES, 0, 0, -1, - ep, sizeof(*ep)); -} diff --git a/usr.sbin/smtpd/scheduler.c b/usr.sbin/smtpd/scheduler.c index 9efaf8029a4..8dbf95d6880 100644 --- a/usr.sbin/smtpd/scheduler.c +++ b/usr.sbin/smtpd/scheduler.c @@ -1,4 +1,4 @@ -/* $OpenBSD: scheduler.c,v 1.8 2012/08/08 08:50:42 eric Exp $ */ +/* $OpenBSD: scheduler.c,v 1.9 2012/08/09 09:48:02 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> @@ -57,9 +57,6 @@ static void scheduler_process_mda(struct scheduler_batch *); static void scheduler_process_mta(struct scheduler_batch *); static int scheduler_load_message(u_int32_t); -void scheduler_envelope_update(struct envelope *); -void scheduler_envelope_delete(struct envelope *); - static struct scheduler_backend *backend = NULL; extern const char *backend_scheduler; @@ -67,127 +64,104 @@ extern const char *backend_scheduler; void scheduler_imsg(struct imsgev *iev, struct imsg *imsg) { - struct envelope *e, bounce; - struct scheduler_info si; + struct envelope *e; + struct scheduler_info si; + uint64_t id; + uint32_t msgid; log_imsg(PROC_SCHEDULER, iev->proc, imsg); switch (imsg->hdr.type) { - case IMSG_QUEUE_COMMIT_MESSAGE: + + case IMSG_QUEUE_SUBMIT_ENVELOPE: e = imsg->data; log_trace(TRACE_SCHEDULER, - "scheduler: IMSG_QUEUE_COMMIT_MESSAGE: %016"PRIx64, e->id); - scheduler_load_message(evpid_to_msgid(e->id)); + "scheduler: inserting evp:%016" PRIx64, e->id); + scheduler_info(&si, e); + backend->insert(&si); + return; + + case IMSG_QUEUE_COMMIT_MESSAGE: + msgid = *(uint32_t *)(imsg->data); + log_trace(TRACE_SCHEDULER, + "scheduler: commiting msg:%08" PRIx32, msgid); + backend->commit(msgid); + scheduler_reset_events(); + return; + + case IMSG_QUEUE_TEMPFAIL: + msgid = *(uint32_t *)(imsg->data); + log_trace(TRACE_SCHEDULER, "scheduler: aborting msg:%08" PRIx32, + msgid); + backend->rollback(msgid); scheduler_reset_events(); return; case IMSG_QUEUE_DELIVERY_OK: - stat_decrement(STATS_SCHEDULER); - e = imsg->data; + id = *(uint64_t *)(imsg->data); log_trace(TRACE_SCHEDULER, - "scheduler: IMSG_QUEUE_DELIVERY_OK: %016"PRIx64, e->id); - backend->delete(e->id); - queue_envelope_delete(e); + "scheduler: deleting evp:%016" PRIx64 " (ok)", id); + backend->delete(id); scheduler_reset_events(); return; case IMSG_QUEUE_DELIVERY_TEMPFAIL: - stat_decrement(STATS_SCHEDULER); e = imsg->data; log_trace(TRACE_SCHEDULER, - "scheduler: IMSG_QUEUE_DELIVERY_TEMPFAIL: %016"PRIx64, e->id); - e->retry++; - queue_envelope_update(e); + "scheduler: updating evp:%016" PRIx64, e->id); scheduler_info(&si, e); backend->update(&si); scheduler_reset_events(); return; case IMSG_QUEUE_DELIVERY_PERMFAIL: - stat_decrement(STATS_SCHEDULER); - e = imsg->data; + id = *(uint64_t *)(imsg->data); log_trace(TRACE_SCHEDULER, - "scheduler: IMSG_QUEUE_DELIVERY_PERMFAIL: %016"PRIx64, e->id); - if (e->type != D_BOUNCE && e->sender.user[0] != '\0') { - bounce_record_message(e, &bounce); - scheduler_info(&si, &bounce); - backend->insert(&si); - backend->commit(evpid_to_msgid(bounce.id)); - } - backend->delete(e->id); - queue_envelope_delete(e); - scheduler_reset_events(); - return; - - case IMSG_MDA_SESS_NEW: - log_trace(TRACE_SCHEDULER, "scheduler: IMSG_MDA_SESS_NEW"); - stat_decrement(STATS_MDA_SESSION); - if (env->sc_maxconn - stat_get(STATS_MDA_SESSION, STAT_ACTIVE)) - env->sc_flags &= ~SMTPD_MDA_BUSY; + "scheduler: deleting evp:%016" PRIx64 " (fail)", id); + backend->delete(id); scheduler_reset_events(); return; - case IMSG_BATCH_DONE: - log_trace(TRACE_SCHEDULER, "scheduler: IMSG_BATCH_DONE"); - stat_decrement(STATS_MTA_SESSION); - if (env->sc_maxconn - stat_get(STATS_MTA_SESSION, STAT_ACTIVE)) - env->sc_flags &= ~SMTPD_MTA_BUSY; - scheduler_reset_events(); - return; - - case IMSG_SMTP_ENQUEUE: - e = imsg->data; - log_trace(TRACE_SCHEDULER, - "scheduler: IMSG_SMTP_ENQUEUE: %016"PRIx64, e->id); - if (imsg->fd < 0 || !bounce_session(imsg->fd, e)) { - queue_envelope_update(e); - scheduler_info(&si, e); - backend->update(&si); - scheduler_reset_events(); - return; - } - return; - case IMSG_QUEUE_PAUSE_MDA: - log_trace(TRACE_SCHEDULER, "scheduler: IMSG_QUEUE_PAUSE_MDA"); + log_trace(TRACE_SCHEDULER, "scheduler: pausing mda"); env->sc_flags |= SMTPD_MDA_PAUSED; return; case IMSG_QUEUE_RESUME_MDA: - log_trace(TRACE_SCHEDULER, "scheduler: IMSG_QUEUE_RESUME_MDA"); + log_trace(TRACE_SCHEDULER, "scheduler: resuming mda"); env->sc_flags &= ~SMTPD_MDA_PAUSED; scheduler_reset_events(); return; case IMSG_QUEUE_PAUSE_MTA: - log_trace(TRACE_SCHEDULER, "scheduler: IMSG_QUEUE_PAUSE_MTA"); + log_trace(TRACE_SCHEDULER, "scheduler: pausing mta"); env->sc_flags |= SMTPD_MTA_PAUSED; return; case IMSG_QUEUE_RESUME_MTA: - log_trace(TRACE_SCHEDULER, "scheduler: IMSG_QUEUE_RESUME_MTA"); + log_trace(TRACE_SCHEDULER, "scheduler: resuming mta"); env->sc_flags &= ~SMTPD_MTA_PAUSED; scheduler_reset_events(); return; case IMSG_CTL_VERBOSE: - log_trace(TRACE_SCHEDULER, "scheduler: IMSG_CTL_VERBOSE"); log_verbose(*(int *)imsg->data); return; case IMSG_SCHEDULER_SCHEDULE: - log_trace(TRACE_SCHEDULER, - "scheduler: IMSG_SCHEDULER_SCHEDULE: %016"PRIx64, - *(u_int64_t *)imsg->data); - backend->schedule(*(u_int64_t *)imsg->data); + id = *(uint64_t *)(imsg->data); + log_debug("scheduler: scheduling evp:%016" PRIx64, id); + backend->schedule(id); scheduler_reset_events(); return; case IMSG_SCHEDULER_REMOVE: - log_trace(TRACE_SCHEDULER, - "scheduler: IMSG_SCHEDULER_REMOVE: %016"PRIx64, - *(u_int64_t *)imsg->data); - backend->remove(*(u_int64_t *)imsg->data); + id = *(uint64_t *)(imsg->data); + if (id <= 0xffffffffL) + log_debug("scheduler: removing msg:%08" PRIx64, id); + else + log_debug("scheduler: removing evp:%016" PRIx64, id); + backend->remove(id); scheduler_reset_events(); return; } @@ -358,15 +332,15 @@ scheduler_timeout(int fd, short event, void *p) static void scheduler_process_remove(struct scheduler_batch *batch) { - struct envelope evp; struct id_list *e; while ((e = batch->evpids)) { batch->evpids = e->next; - log_debug("scheduler: deleting evp:%016" PRIx64 " (removed)", + log_debug("scheduler: evp:%016" PRIx64 " removed", e->id); - evp.id = e->id; - queue_envelope_delete(&evp); + backend->delete(e->id); + imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_QUEUE_REMOVE, + 0, 0, -1, &e->id, sizeof e->id); free(e); } } @@ -374,15 +348,15 @@ scheduler_process_remove(struct scheduler_batch *batch) static void scheduler_process_expire(struct scheduler_batch *batch) { - struct envelope evp; struct id_list *e; while ((e = batch->evpids)) { batch->evpids = e->next; - log_debug("scheduler: deleting evp:%016" PRIx64 " (expire)", + log_debug("scheduler: evp:%016" PRIx64 " expired", e->id); - evp.id = e->id; - queue_envelope_delete(&evp); + backend->delete(e->id); + imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_QUEUE_EXPIRE, + 0, 0, -1, &e->id, sizeof e->id); free(e); } } @@ -390,20 +364,14 @@ scheduler_process_expire(struct scheduler_batch *batch) static void scheduler_process_bounce(struct scheduler_batch *batch) { - struct envelope evp; struct id_list *e; while ((e = batch->evpids)) { batch->evpids = e->next; - log_debug("scheduler: scheduling evp:%016" PRIx64 " (bounce)", + log_debug("scheduler: evp:%016" PRIx64 " scheduled (bounce)", e->id); - queue_envelope_load(e->id, &evp); - evp.lasttry = time(NULL); - imsg_compose_event(env->sc_ievs[PROC_QUEUE], - IMSG_SMTP_ENQUEUE, PROC_SMTP, 0, -1, &evp, - sizeof evp); - stat_increment(STATS_SCHEDULER); - stat_increment(STATS_SCHEDULER_BOUNCES); + imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_SMTP_ENQUEUE, + 0, 0, -1, &e->id, sizeof e->id); free(e); } } @@ -411,22 +379,14 @@ scheduler_process_bounce(struct scheduler_batch *batch) static void scheduler_process_mda(struct scheduler_batch *batch) { - struct envelope evp; struct id_list *e; - int fd; while ((e = batch->evpids)) { batch->evpids = e->next; - log_debug("scheduler: scheduling evp:%016" PRIx64 " (mda)", + log_debug("scheduler: evp:%016" PRIx64 " scheduled (mda)", e->id); - queue_envelope_load(e->id, &evp); - evp.lasttry = time(NULL); - fd = queue_message_fd_r(evpid_to_msgid(evp.id)); - imsg_compose_event(env->sc_ievs[PROC_QUEUE], - IMSG_MDA_SESS_NEW, PROC_MDA, 0, fd, &evp, - sizeof evp); - stat_increment(STATS_SCHEDULER); - stat_increment(STATS_MDA_SESSION); + imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_MDA_SESS_NEW, + 0, 0, -1, &e->id, sizeof e->id); free(e); } } @@ -434,73 +394,22 @@ scheduler_process_mda(struct scheduler_batch *batch) static void scheduler_process_mta(struct scheduler_batch *batch) { - struct envelope evp; - struct mta_batch mta_batch; struct id_list *e; - queue_envelope_load(batch->evpids->id, &evp); - - bzero(&mta_batch, sizeof mta_batch); - mta_batch.id = arc4random(); - mta_batch.relay = evp.agent.mta.relay; - - imsg_compose_event(env->sc_ievs[PROC_QUEUE], - IMSG_BATCH_CREATE, PROC_MTA, 0, -1, &mta_batch, - sizeof mta_batch); + imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_BATCH_CREATE, + 0, 0, -1, NULL, 0); while ((e = batch->evpids)) { batch->evpids = e->next; - log_debug("scheduler: scheduling evp:%016" PRIx64 " (mta)", + log_debug("scheduler: evp:%016" PRIx64 " scheduled (mta)", e->id); - queue_envelope_load(e->id, &evp); - evp.lasttry = time(NULL); - evp.batch_id = mta_batch.id; imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_BATCH_APPEND, - PROC_MTA, 0, -1, &evp, sizeof evp); + 0, 0, -1, &e->id, sizeof e->id); free(e); - stat_increment(STATS_SCHEDULER); } imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_BATCH_CLOSE, - PROC_MTA, 0, -1, &mta_batch, sizeof mta_batch); + 0, 0, -1, NULL, 0); stat_increment(STATS_MTA_SESSION); } - -void -scheduler_envelope_update(struct envelope *e) -{ - struct scheduler_info si; - - scheduler_info(&si, e); - backend->update(&si); - scheduler_reset_events(); -} - -void -scheduler_envelope_delete(struct envelope *e) -{ - backend->delete(e->id); - scheduler_reset_events(); -} - -static int -scheduler_load_message(u_int32_t msgid) -{ - struct qwalk *q; - u_int64_t evpid; - struct envelope envelope; - struct scheduler_info si; - - q = qwalk_new(msgid); - while (qwalk(q, &evpid)) { - if (! queue_envelope_load(evpid, &envelope)) - continue; - scheduler_info(&si, &envelope); - backend->insert(&si); - } - qwalk_close(q); - backend->commit(msgid); - - return 1; -} diff --git a/usr.sbin/smtpd/smtp.c b/usr.sbin/smtpd/smtp.c index fc0871c20cb..a9b217df20e 100644 --- a/usr.sbin/smtpd/smtp.c +++ b/usr.sbin/smtpd/smtp.c @@ -1,4 +1,4 @@ -/* $OpenBSD: smtp.c,v 1.102 2012/06/01 14:55:09 eric Exp $ */ +/* $OpenBSD: smtp.c,v 1.103 2012/08/09 09:48:02 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> @@ -154,7 +154,7 @@ smtp_imsg(struct imsgev *iev, struct imsg *imsg) case IMSG_SMTP_ENQUEUE: imsg_compose_event(iev, IMSG_SMTP_ENQUEUE, 0, 0, smtp_enqueue(NULL), imsg->data, - sizeof(struct envelope)); + imsg->hdr.len - sizeof imsg->hdr); return; } } diff --git a/usr.sbin/smtpd/smtpd.c b/usr.sbin/smtpd/smtpd.c index 7fbbe7045fe..28cf891ed98 100644 --- a/usr.sbin/smtpd/smtpd.c +++ b/usr.sbin/smtpd/smtpd.c @@ -1,4 +1,4 @@ -/* $OpenBSD: smtpd.c,v 1.156 2012/08/08 17:28:36 eric Exp $ */ +/* $OpenBSD: smtpd.c,v 1.157 2012/08/09 09:48:02 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> @@ -1248,8 +1248,8 @@ imsg_to_str(int type) CASE(IMSG_QUEUE_MESSAGE_FD); CASE(IMSG_QUEUE_MESSAGE_FILE); - CASE(IMSG_QUEUE_SCHEDULE); CASE(IMSG_QUEUE_REMOVE); + CASE(IMSG_QUEUE_EXPIRE); CASE(IMSG_SCHEDULER_REMOVE); CASE(IMSG_SCHEDULER_SCHEDULE); @@ -1257,7 +1257,6 @@ imsg_to_str(int type) CASE(IMSG_BATCH_CREATE); CASE(IMSG_BATCH_APPEND); CASE(IMSG_BATCH_CLOSE); - CASE(IMSG_BATCH_DONE); CASE(IMSG_PARENT_FORWARD_OPEN); CASE(IMSG_PARENT_FORK_MDA); diff --git a/usr.sbin/smtpd/smtpd.h b/usr.sbin/smtpd/smtpd.h index 62f6ae20cfe..688a9a59b4a 100644 --- a/usr.sbin/smtpd/smtpd.h +++ b/usr.sbin/smtpd/smtpd.h @@ -1,4 +1,4 @@ -/* $OpenBSD: smtpd.h,v 1.316 2012/08/08 17:33:55 eric Exp $ */ +/* $OpenBSD: smtpd.h,v 1.317 2012/08/09 09:48:02 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> @@ -156,8 +156,8 @@ enum imsg_type { IMSG_QUEUE_DELIVERY_PERMFAIL, IMSG_QUEUE_MESSAGE_FD, IMSG_QUEUE_MESSAGE_FILE, - IMSG_QUEUE_SCHEDULE, IMSG_QUEUE_REMOVE, + IMSG_QUEUE_EXPIRE, IMSG_SCHEDULER_REMOVE, IMSG_SCHEDULER_SCHEDULE, @@ -165,7 +165,6 @@ enum imsg_type { IMSG_BATCH_CREATE, IMSG_BATCH_APPEND, IMSG_BATCH_CLOSE, - IMSG_BATCH_DONE, IMSG_PARENT_FORWARD_OPEN, IMSG_PARENT_FORK_MDA, @@ -324,7 +323,6 @@ struct mailaddr { }; enum delivery_type { - D_INVALID = 0, D_MDA, D_MTA, D_BOUNCE @@ -1077,8 +1075,6 @@ int cmdline_symset(char *); /* queue.c */ pid_t queue(void); -void queue_submit_envelope(struct envelope *); -void queue_commit_envelopes(struct envelope *); /* queue_backend.c */ |