diff options
Diffstat (limited to 'usr.sbin/smtpd/queue.c')
-rw-r--r-- | usr.sbin/smtpd/queue.c | 85 |
1 files changed, 84 insertions, 1 deletions
diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c index eff14eb18b3..dbe12312660 100644 --- a/usr.sbin/smtpd/queue.c +++ b/usr.sbin/smtpd/queue.c @@ -1,4 +1,4 @@ -/* $OpenBSD: queue.c,v 1.169 2015/10/16 13:37:44 millert Exp $ */ +/* $OpenBSD: queue.c,v 1.170 2015/10/29 10:25:36 sunil Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org> @@ -47,6 +47,7 @@ static void queue_bounce(struct envelope *, struct delivery_bounce *); static void queue_shutdown(void); static void queue_sig_handler(int, short, void *); static void queue_log(const struct envelope *, const char *, const char *); +static void queue_msgid_walk(int, short, void *); static size_t flow_agent_hiwat = 10 * 1024 * 1024; static size_t flow_agent_lowat = 1 * 1024 * 1024; @@ -62,6 +63,8 @@ static void queue_imsg(struct mproc *p, struct imsg *imsg) { struct delivery_bounce bounce; + struct msg_walkinfo *wi; + struct timeval tv; struct bounce_req_msg *req_bounce; struct envelope evp; struct msg m; @@ -69,6 +72,7 @@ queue_imsg(struct mproc *p, struct imsg *imsg) uint64_t reqid, evpid, holdq; uint32_t msgid; time_t nexttry; + size_t n_evp; int fd, mta_ext, ret, v, flags, code; memset(&bounce, 0, sizeof(struct delivery_bounce)); @@ -500,6 +504,48 @@ queue_imsg(struct mproc *p, struct imsg *imsg) m_end(&m); profiling = v; return; + + case IMSG_CTL_DISCOVER_EVPID: + m_msg(&m, imsg); + m_get_evpid(&m, &evpid); + m_end(&m); + if (queue_envelope_load(evpid, &evp) == 0) { + log_warnx("queue: discover: failed to load " + "envelope %016" PRIx64, evpid); + n_evp = 0; + m_compose(p_control, imsg->hdr.type, + imsg->hdr.peerid, 0, -1, + &n_evp, sizeof n_evp); + return; + } + + m_create(p_scheduler, IMSG_QUEUE_DISCOVER_EVPID, + 0, 0, -1); + m_add_envelope(p_scheduler, &evp); + m_close(p_scheduler); + + m_create(p_scheduler, IMSG_QUEUE_DISCOVER_MSGID, + 0, 0, -1); + m_add_msgid(p_scheduler, evpid_to_msgid(evpid)); + m_close(p_scheduler); + n_evp = 1; + m_compose(p_control, imsg->hdr.type, imsg->hdr.peerid, + 0, -1, &n_evp, sizeof n_evp); + return; + + case IMSG_CTL_DISCOVER_MSGID: + m_msg(&m, imsg); + m_get_msgid(&m, &msgid); + m_end(&m); + /* handle concurrent walk requests */ + wi = xcalloc(1, sizeof *wi, "queu_imsg"); + wi->msgid = msgid; + wi->peerid = imsg->hdr.peerid; + evtimer_set(&wi->ev, queue_msgid_walk, wi); + tv.tv_sec = 0; + tv.tv_usec = 10; + evtimer_add(&wi->ev, &tv); + return; } } @@ -507,6 +553,43 @@ queue_imsg(struct mproc *p, struct imsg *imsg) } static void +queue_msgid_walk(int fd, short event, void *arg) +{ + struct envelope evp; + struct timeval tv; + struct msg_walkinfo *wi = arg; + int r; + + r = queue_message_walk(&evp, wi->msgid, &wi->done, &wi->data); + if (r == -1) { + if (wi->n_evp) { + m_create(p_scheduler, IMSG_QUEUE_DISCOVER_MSGID, + 0, 0, -1); + m_add_msgid(p_scheduler, wi->msgid); + m_close(p_scheduler); + } + + m_compose(p_control, IMSG_CTL_DISCOVER_MSGID, wi->peerid, 0, -1, + &wi->n_evp, sizeof wi->n_evp); + evtimer_del(&wi->ev); + free(wi); + return; + } + + if (r) { + m_create(p_scheduler, IMSG_QUEUE_DISCOVER_EVPID, 0, 0, -1); + m_add_envelope(p_scheduler, &evp); + m_close(p_scheduler); + wi->n_evp += 1; + } + + tv.tv_sec = 0; + tv.tv_usec = 10; + evtimer_set(&wi->ev, queue_msgid_walk, wi); + evtimer_add(&wi->ev, &tv); +} + +static void queue_bounce(struct envelope *e, struct delivery_bounce *d) { struct envelope b; |