diff options
Diffstat (limited to 'usr.sbin/smtpd/queue.c')
-rw-r--r-- | usr.sbin/smtpd/queue.c | 217 |
1 files changed, 147 insertions, 70 deletions
diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c index c8c2d31403c..e51f9532422 100644 --- a/usr.sbin/smtpd/queue.c +++ b/usr.sbin/smtpd/queue.c @@ -1,4 +1,4 @@ -/* $OpenBSD: queue.c,v 1.122 2012/08/08 08:50:42 eric Exp $ */ +/* $OpenBSD: queue.c,v 1.123 2012/08/09 09:48:02 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> @@ -41,6 +41,7 @@ static void queue_imsg(struct imsgev *, struct imsg *); static void queue_timeout(int, short, void *); +static void queue_bounce(struct envelope *); static void queue_pass_to_scheduler(struct imsgev *, struct imsg *); static void queue_shutdown(void); static void queue_sig_handler(int, short, void *); @@ -48,10 +49,12 @@ static void queue_sig_handler(int, short, void *); static void queue_imsg(struct imsgev *iev, struct imsg *imsg) { + static struct mta_batch batch, *mta_batch; struct submit_status ss; - struct envelope *e; - struct mta_batch *mta_batch; + struct envelope *e, evp; int fd, ret; + uint64_t id; + uint32_t msgid; log_imsg(PROC_QUEUE, iev->proc, imsg); @@ -66,8 +69,8 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg) ret = queue_message_create(&ss.u.msgid); if (ret == 0) ss.code = 421; - imsg_compose_event(iev, IMSG_QUEUE_CREATE_MESSAGE, 0, 0, -1, - &ss, sizeof ss); + imsg_compose_event(iev, IMSG_QUEUE_CREATE_MESSAGE, 0, 0, + -1, &ss, sizeof ss); return; case IMSG_QUEUE_REMOVE_MESSAGE: @@ -77,18 +80,18 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg) case IMSG_QUEUE_COMMIT_MESSAGE: ss.id = e->session_id; ss.code = 250; - if (queue_message_commit(evpid_to_msgid(e->id))) + msgid = evpid_to_msgid(e->id); + if (queue_message_commit(msgid)) { stat_increment(e->flags & DF_ENQUEUED ? STATS_QUEUE_LOCAL : STATS_QUEUE_REMOTE); - else + imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], + IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, + &msgid, sizeof msgid); + } else ss.code = 421; - imsg_compose_event(iev, IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, - &ss, sizeof ss); - - if (ss.code != 421) - queue_pass_to_scheduler(iev, imsg); - + imsg_compose_event(iev, IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, + -1, &ss, sizeof ss); return; case IMSG_QUEUE_MESSAGE_FILE: @@ -96,28 +99,33 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg) fd = queue_message_fd_rw(evpid_to_msgid(e->id)); if (fd == -1) ss.code = 421; - imsg_compose_event(iev, IMSG_QUEUE_MESSAGE_FILE, 0, 0, fd, - &ss, sizeof ss); + imsg_compose_event(iev, IMSG_QUEUE_MESSAGE_FILE, 0, 0, + fd, &ss, sizeof ss); return; case IMSG_SMTP_ENQUEUE: - queue_pass_to_scheduler(iev, imsg); + id = *(uint64_t*)(imsg->data); + queue_envelope_load(id, &evp); + bounce_session(imsg->fd, &evp); return; } } if (iev->proc == PROC_LKA) { e = imsg->data; - switch (imsg->hdr.type) { case IMSG_QUEUE_SUBMIT_ENVELOPE: - ss.id = e->session_id; - ret = queue_envelope_create(e); - if (ret == 0) { + if (!queue_envelope_create(e)) { + ss.id = e->session_id; ss.code = 421; imsg_compose_event(env->sc_ievs[PROC_SMTP], IMSG_QUEUE_TEMPFAIL, 0, 0, -1, &ss, sizeof ss); + } else { + /* tell the scheduler */ + imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], + IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, e, + sizeof *e); } return; @@ -132,14 +140,64 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg) } if (iev->proc == PROC_SCHEDULER) { - /* forward imsgs from scheduler on its behalf */ - imsg_compose_event(env->sc_ievs[imsg->hdr.peerid], imsg->hdr.type, - 0, imsg->hdr.pid, imsg->fd, (char *)imsg->data, - imsg->hdr.len - sizeof imsg->hdr); - return; + switch (imsg->hdr.type) { + case IMSG_QUEUE_REMOVE: + evp.id = *(uint64_t*)(imsg->data); + queue_envelope_delete(&evp); + return; + + case IMSG_QUEUE_EXPIRE: + id = *(uint64_t*)(imsg->data); + queue_envelope_load(id, &evp); + envelope_set_errormsg(&evp, "envelope expired"); + queue_bounce(&evp); + queue_envelope_delete(&evp); + return; + + case IMSG_MDA_SESS_NEW: + id = *(uint64_t*)(imsg->data); + queue_envelope_load(id, &evp); + evp.lasttry = time(NULL); + fd = queue_message_fd_r(evpid_to_msgid(id)); + imsg_compose_event(env->sc_ievs[PROC_MDA], + IMSG_MDA_SESS_NEW, 0, 0, fd, &evp, sizeof evp); + return; + + case IMSG_SMTP_ENQUEUE: + id = *(uint64_t*)(imsg->data); + imsg_compose_event(env->sc_ievs[PROC_SMTP], + IMSG_SMTP_ENQUEUE, 0, 0, -1, &id, sizeof id); + return; + + case IMSG_BATCH_CREATE: + bzero(&batch, sizeof batch); + return; + + case IMSG_BATCH_APPEND: + id = *(uint64_t*)(imsg->data); + queue_envelope_load(id, &evp); + if (!batch.id) { + batch.id = generate_uid(); + batch.relay = evp.agent.mta.relay; + imsg_compose_event(env->sc_ievs[PROC_MTA], + IMSG_BATCH_CREATE, 0, 0, -1, + &batch, sizeof batch); + } + evp.lasttry = time(NULL); + evp.batch_id = batch.id; + imsg_compose_event(env->sc_ievs[PROC_MTA], + IMSG_BATCH_APPEND, 0, 0, -1, &evp, sizeof evp); + return; + + case IMSG_BATCH_CLOSE: + imsg_compose_event(env->sc_ievs[PROC_MTA], + IMSG_BATCH_CLOSE, 0, 0, -1, + &batch.id, sizeof batch.id); + return; + } } - if (iev->proc == PROC_MTA) { + if (iev->proc == PROC_MTA || iev->proc == PROC_MDA) { switch (imsg->hdr.type) { case IMSG_QUEUE_MESSAGE_FD: mta_batch = imsg->data; @@ -149,21 +207,29 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg) return; case IMSG_QUEUE_DELIVERY_OK: - case IMSG_QUEUE_DELIVERY_TEMPFAIL: - case IMSG_QUEUE_DELIVERY_PERMFAIL: - case IMSG_BATCH_DONE: - queue_pass_to_scheduler(iev, imsg); + e = imsg->data; + queue_envelope_delete(e); + imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], + IMSG_QUEUE_DELIVERY_OK, 0, 0, -1, &e->id, + sizeof e->id); return; - } - } - if (iev->proc == PROC_MDA) { - switch (imsg->hdr.type) { - case IMSG_QUEUE_DELIVERY_OK: case IMSG_QUEUE_DELIVERY_TEMPFAIL: + e = imsg->data; + e->retry++; + queue_envelope_update(e); + imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], + IMSG_QUEUE_DELIVERY_TEMPFAIL, 0, 0, -1, e, + sizeof *e); + return; + case IMSG_QUEUE_DELIVERY_PERMFAIL: - case IMSG_MDA_SESS_NEW: - queue_pass_to_scheduler(iev, imsg); + e = imsg->data; + queue_bounce(e); + queue_envelope_delete(e); + imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], + IMSG_QUEUE_DELIVERY_PERMFAIL, 0, 0, -1, &e->id, + sizeof e->id); return; } } @@ -174,7 +240,6 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg) case IMSG_QUEUE_PAUSE_MTA: case IMSG_QUEUE_RESUME_MDA: case IMSG_QUEUE_RESUME_MTA: - case IMSG_QUEUE_SCHEDULE: case IMSG_QUEUE_REMOVE: queue_pass_to_scheduler(iev, imsg); return; @@ -202,6 +267,36 @@ queue_pass_to_scheduler(struct imsgev *iev, struct imsg *imsg) } static void +queue_bounce(struct envelope *e) +{ + struct envelope b; + uint32_t msgid; + + b = *e; + b.type = D_BOUNCE; + b.retry = 0; + b.lasttry = 0; + b.creation = time(NULL); + b.expire = 3600 * 24 * 7; + + if (e->type == D_BOUNCE) { + log_warnx("queue: double bounce!"); + } else if (e->sender.user[0] == '\0') { + log_warnx("queue: no return path!"); + } else if (!queue_envelope_create(&b)) { + log_warnx("queue: cannot bounce!"); + } else { + log_debug("queue: bouncing evp:%016" PRIx64 + " as evp:%016" PRIx64, e->id, b.id); + imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], + IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, &b, sizeof b); + msgid = evpid_to_msgid(b.id); + imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], + IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &msgid, sizeof msgid); + } +} + +static void queue_sig_handler(int sig, short event, void *p) { switch (sig) { @@ -302,12 +397,12 @@ queue(void) return (0); } -void +static void queue_timeout(int fd, short event, void *p) { static struct qwalk *q = NULL; + static uint32_t last_msgid = 0; struct event *ev = p; - static uint64_t last_evpid = 0; struct envelope envelope; struct timeval tv; uint64_t evpid; @@ -318,47 +413,29 @@ queue_timeout(int fd, short event, void *p) } while (qwalk(q, &evpid)) { - if (! queue_envelope_load(evpid, &envelope)) - continue; - - if (evpid_to_msgid(evpid) != evpid_to_msgid(last_evpid) && - last_evpid != 0) { - envelope.id = last_evpid; + if (queue_envelope_load(evpid, &envelope)) imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], - IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &envelope, + IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, &envelope, sizeof envelope); - } - last_evpid = evpid; + if (last_msgid && evpid_to_msgid(evpid) != last_msgid) + imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], + IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &last_msgid, + sizeof last_msgid); + + last_msgid = evpid_to_msgid(evpid); tv.tv_sec = 0; tv.tv_usec = 0; evtimer_add(ev, &tv); return; } - if (last_evpid) { - envelope.id = last_evpid; + if (last_msgid) { imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], - IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &envelope, - sizeof envelope); + IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &last_msgid, + sizeof last_msgid); } - log_info("queue: done loading queue into scheduler"); + log_debug("queue: done loading queue into scheduler"); qwalk_close(q); } - -void -queue_submit_envelope(struct envelope *ep) -{ - imsg_compose_event(env->sc_ievs[PROC_QUEUE], - IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, - ep, sizeof(*ep)); -} - -void -queue_commit_envelopes(struct envelope *ep) -{ - imsg_compose_event(env->sc_ievs[PROC_QUEUE], - IMSG_QUEUE_COMMIT_ENVELOPES, 0, 0, -1, - ep, sizeof(*ep)); -} |