summaryrefslogtreecommitdiff
path: root/usr.sbin/smtpd/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'usr.sbin/smtpd/queue.c')
-rw-r--r--usr.sbin/smtpd/queue.c85
1 files changed, 84 insertions, 1 deletions
diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c
index eff14eb18b3..dbe12312660 100644
--- a/usr.sbin/smtpd/queue.c
+++ b/usr.sbin/smtpd/queue.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: queue.c,v 1.169 2015/10/16 13:37:44 millert Exp $ */
+/* $OpenBSD: queue.c,v 1.170 2015/10/29 10:25:36 sunil Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@poolp.org>
@@ -47,6 +47,7 @@ static void queue_bounce(struct envelope *, struct delivery_bounce *);
static void queue_shutdown(void);
static void queue_sig_handler(int, short, void *);
static void queue_log(const struct envelope *, const char *, const char *);
+static void queue_msgid_walk(int, short, void *);
static size_t flow_agent_hiwat = 10 * 1024 * 1024;
static size_t flow_agent_lowat = 1 * 1024 * 1024;
@@ -62,6 +63,8 @@ static void
queue_imsg(struct mproc *p, struct imsg *imsg)
{
struct delivery_bounce bounce;
+ struct msg_walkinfo *wi;
+ struct timeval tv;
struct bounce_req_msg *req_bounce;
struct envelope evp;
struct msg m;
@@ -69,6 +72,7 @@ queue_imsg(struct mproc *p, struct imsg *imsg)
uint64_t reqid, evpid, holdq;
uint32_t msgid;
time_t nexttry;
+ size_t n_evp;
int fd, mta_ext, ret, v, flags, code;
memset(&bounce, 0, sizeof(struct delivery_bounce));
@@ -500,6 +504,48 @@ queue_imsg(struct mproc *p, struct imsg *imsg)
m_end(&m);
profiling = v;
return;
+
+ case IMSG_CTL_DISCOVER_EVPID:
+ m_msg(&m, imsg);
+ m_get_evpid(&m, &evpid);
+ m_end(&m);
+ if (queue_envelope_load(evpid, &evp) == 0) {
+ log_warnx("queue: discover: failed to load "
+ "envelope %016" PRIx64, evpid);
+ n_evp = 0;
+ m_compose(p_control, imsg->hdr.type,
+ imsg->hdr.peerid, 0, -1,
+ &n_evp, sizeof n_evp);
+ return;
+ }
+
+ m_create(p_scheduler, IMSG_QUEUE_DISCOVER_EVPID,
+ 0, 0, -1);
+ m_add_envelope(p_scheduler, &evp);
+ m_close(p_scheduler);
+
+ m_create(p_scheduler, IMSG_QUEUE_DISCOVER_MSGID,
+ 0, 0, -1);
+ m_add_msgid(p_scheduler, evpid_to_msgid(evpid));
+ m_close(p_scheduler);
+ n_evp = 1;
+ m_compose(p_control, imsg->hdr.type, imsg->hdr.peerid,
+ 0, -1, &n_evp, sizeof n_evp);
+ return;
+
+ case IMSG_CTL_DISCOVER_MSGID:
+ m_msg(&m, imsg);
+ m_get_msgid(&m, &msgid);
+ m_end(&m);
+ /* handle concurrent walk requests */
+ wi = xcalloc(1, sizeof *wi, "queu_imsg");
+ wi->msgid = msgid;
+ wi->peerid = imsg->hdr.peerid;
+ evtimer_set(&wi->ev, queue_msgid_walk, wi);
+ tv.tv_sec = 0;
+ tv.tv_usec = 10;
+ evtimer_add(&wi->ev, &tv);
+ return;
}
}
@@ -507,6 +553,43 @@ queue_imsg(struct mproc *p, struct imsg *imsg)
}
static void
+queue_msgid_walk(int fd, short event, void *arg)
+{
+ struct envelope evp;
+ struct timeval tv;
+ struct msg_walkinfo *wi = arg;
+ int r;
+
+ r = queue_message_walk(&evp, wi->msgid, &wi->done, &wi->data);
+ if (r == -1) {
+ if (wi->n_evp) {
+ m_create(p_scheduler, IMSG_QUEUE_DISCOVER_MSGID,
+ 0, 0, -1);
+ m_add_msgid(p_scheduler, wi->msgid);
+ m_close(p_scheduler);
+ }
+
+ m_compose(p_control, IMSG_CTL_DISCOVER_MSGID, wi->peerid, 0, -1,
+ &wi->n_evp, sizeof wi->n_evp);
+ evtimer_del(&wi->ev);
+ free(wi);
+ return;
+ }
+
+ if (r) {
+ m_create(p_scheduler, IMSG_QUEUE_DISCOVER_EVPID, 0, 0, -1);
+ m_add_envelope(p_scheduler, &evp);
+ m_close(p_scheduler);
+ wi->n_evp += 1;
+ }
+
+ tv.tv_sec = 0;
+ tv.tv_usec = 10;
+ evtimer_set(&wi->ev, queue_msgid_walk, wi);
+ evtimer_add(&wi->ev, &tv);
+}
+
+static void
queue_bounce(struct envelope *e, struct delivery_bounce *d)
{
struct envelope b;