summaryrefslogtreecommitdiff
path: root/usr.sbin
diff options
context:
space:
mode:
authorEric Faurot <eric@cvs.openbsd.org>2012-11-20 09:47:47 +0000
committerEric Faurot <eric@cvs.openbsd.org>2012-11-20 09:47:47 +0000
commit65898a3024a49104a9e8123deec105b11de001e3 (patch)
tree657d34d40deed3c1977a7b942dc5e32505512802 /usr.sbin
parentf6aefaa94996adb3a94cf829d7d536f2457f186f (diff)
Allow "smtpctl show queue" to run in "online" mode if the smtpd server
is running. The scheduler sends the runtime state of each envelope to the queue process which loads the envelope, fills the runtime bits and sends the envelope back to the client. Iteration over the envelope set happens in small chunks to make the request interruptible and to allow the server to keep doing its job in the meantime. Adpat "smtpctl schedule-all" to schedule the messages one by one using the same iteration mechanism. Document "smtpctl monitor" and "smtpctl show queue". ok gilles@
Diffstat (limited to 'usr.sbin')
-rw-r--r--usr.sbin/smtpd/control.c40
-rw-r--r--usr.sbin/smtpd/queue.c36
-rw-r--r--usr.sbin/smtpd/scheduler.c64
-rw-r--r--usr.sbin/smtpd/scheduler_ramqueue.c259
-rw-r--r--usr.sbin/smtpd/smtpctl.8101
-rw-r--r--usr.sbin/smtpd/smtpctl.c251
-rw-r--r--usr.sbin/smtpd/smtpd.c4
-rw-r--r--usr.sbin/smtpd/smtpd.h21
-rw-r--r--usr.sbin/smtpd/tree.c34
9 files changed, 632 insertions, 178 deletions
diff --git a/usr.sbin/smtpd/control.c b/usr.sbin/smtpd/control.c
index e1eb8a925a9..86de6d06357 100644
--- a/usr.sbin/smtpd/control.c
+++ b/usr.sbin/smtpd/control.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: control.c,v 1.78 2012/11/12 14:58:53 eric Exp $ */
+/* $OpenBSD: control.c,v 1.79 2012/11/20 09:47:45 eric Exp $ */
/*
* Copyright (c) 2012 Gilles Chehade <gilles@openbsd.org>
@@ -94,6 +94,28 @@ control_imsg(struct imsgev *iev, struct imsg *imsg)
return;
}
}
+ if (iev->proc == PROC_SCHEDULER) {
+ switch (imsg->hdr.type) {
+ case IMSG_SCHEDULER_MESSAGES:
+ c = control_connbyfd(imsg->hdr.peerid);
+ if (c == NULL)
+ return;
+ imsg_compose_event(&c->iev, IMSG_SCHEDULER_MESSAGES, 0,
+ 0, -1, imsg->data, imsg->hdr.len-sizeof imsg->hdr);
+ return;
+ }
+ }
+ if (iev->proc == PROC_QUEUE) {
+ switch (imsg->hdr.type) {
+ case IMSG_SCHEDULER_ENVELOPES:
+ c = control_connbyfd(imsg->hdr.peerid);
+ if (c == NULL)
+ return;
+ imsg_compose_event(&c->iev, IMSG_SCHEDULER_ENVELOPES, 0,
+ 0, -1, imsg->data, imsg->hdr.len-sizeof imsg->hdr);
+ return;
+ }
+ }
switch (imsg->hdr.type) {
case IMSG_STAT_INCREMENT:
@@ -591,6 +613,22 @@ control_dispatch_ext(int fd, short event, void *arg)
imsg_compose_event(&c->iev, IMSG_CTL_OK, 0, 0, -1, NULL, 0);
break;
+ case IMSG_SCHEDULER_MESSAGES:
+ if (euid)
+ goto badcred;
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_SCHEDULER_MESSAGES, fd, 0, -1, imsg.data,
+ imsg.hdr.len - sizeof(imsg.hdr));
+ break;
+
+ case IMSG_SCHEDULER_ENVELOPES:
+ if (euid)
+ goto badcred;
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_SCHEDULER_ENVELOPES, fd, 0, -1, imsg.data,
+ imsg.hdr.len - sizeof(imsg.hdr));
+ break;
+
case IMSG_SCHEDULER_SCHEDULE:
if (euid)
goto badcred;
diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c
index a4f5a19ecec..8bde4360274 100644
--- a/usr.sbin/smtpd/queue.c
+++ b/usr.sbin/smtpd/queue.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: queue.c,v 1.142 2012/11/13 13:23:23 eric Exp $ */
+/* $OpenBSD: queue.c,v 1.143 2012/11/20 09:47:45 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -51,6 +51,7 @@ static void queue_sig_handler(int, short, void *);
static void
queue_imsg(struct imsgev *iev, struct imsg *imsg)
{
+ struct evpstate *state;
static uint64_t batch_id;
struct submit_status ss;
struct envelope *e, evp;
@@ -197,7 +198,38 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg)
IMSG_BATCH_CLOSE, 0, 0, -1,
&batch_id, sizeof batch_id);
return;
- }
+
+ case IMSG_SCHEDULER_ENVELOPES:
+ if (imsg->hdr.len == sizeof imsg->hdr) {
+ imsg_compose_event(env->sc_ievs[PROC_CONTROL],
+ IMSG_SCHEDULER_ENVELOPES, imsg->hdr.peerid,
+ 0, -1, NULL, 0);
+ return;
+ }
+ state = imsg->data;
+ if (queue_envelope_load(state->evpid, &evp) == 0)
+ return; /* Envelope is gone, drop it */
+ /*
+ * XXX consistency: The envelope might already be on
+ * its way back to the scheduler. We need to detect
+ * this properly and report that state.
+ */
+ evp.flags |= state->flags;
+ /* In the past if running or runnable */
+ evp.nexttry = state->time;
+ if (state->flags == DF_INFLIGHT) {
+ /*
+ * Not exactly correct but pretty close: The
+ * value is not recorded on the envelope unless
+ * a tempfail occurs.
+ */
+ evp.lasttry = state->time;
+ }
+ imsg_compose_event(env->sc_ievs[PROC_CONTROL],
+ IMSG_SCHEDULER_ENVELOPES, imsg->hdr.peerid, 0, -1,
+ &evp, sizeof evp);
+ return;
+ }
}
if (iev->proc == PROC_MTA || iev->proc == PROC_MDA) {
diff --git a/usr.sbin/smtpd/scheduler.c b/usr.sbin/smtpd/scheduler.c
index 44f0f15eec1..31cddeecaef 100644
--- a/usr.sbin/smtpd/scheduler.c
+++ b/usr.sbin/smtpd/scheduler.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: scheduler.c,v 1.23 2012/11/12 14:58:53 eric Exp $ */
+/* $OpenBSD: scheduler.c,v 1.24 2012/11/20 09:47:45 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -60,14 +60,18 @@ static struct scheduler_backend *backend = NULL;
extern const char *backend_scheduler;
+#define MSGBATCHSIZE 1024
+#define EVPBATCHSIZE 256
+
void
scheduler_imsg(struct imsgev *iev, struct imsg *imsg)
{
+ struct evpstate state[EVPBATCHSIZE];
struct envelope *e;
struct scheduler_info si;
uint64_t id;
- uint32_t msgid;
- size_t n;
+ uint32_t msgid, msgids[MSGBATCHSIZE];
+ size_t n, i;
switch (imsg->hdr.type) {
@@ -169,14 +173,33 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg)
log_verbose(*(int *)imsg->data);
return;
+ case IMSG_SCHEDULER_MESSAGES:
+ msgid = *(uint32_t *)(imsg->data);
+ n = backend->messages(msgid, msgids, MSGBATCHSIZE);
+ imsg_compose_event(iev, IMSG_SCHEDULER_MESSAGES,
+ imsg->hdr.peerid, 0, -1, msgids, n * sizeof (*msgids));
+ return;
+
+ case IMSG_SCHEDULER_ENVELOPES:
+ id = *(uint64_t *)(imsg->data);
+ n = backend->envelopes(id, state, EVPBATCHSIZE);
+ for (i = 0; i < n; i++) {
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE],
+ IMSG_SCHEDULER_ENVELOPES, imsg->hdr.peerid, 0, -1,
+ &state[i], sizeof state[i]);
+ }
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE],
+ IMSG_SCHEDULER_ENVELOPES, imsg->hdr.peerid, 0, -1, NULL, 0);
+ return;
+
case IMSG_SCHEDULER_SCHEDULE:
id = *(uint64_t *)(imsg->data);
- if (id == 0)
- log_debug("debug: scheduler: scheduling all envelopes");
- else if (id <= 0xffffffffL)
- log_debug("debug: scheduler: scheduling msg:%08" PRIx64, id);
+ if (id <= 0xffffffffL)
+ log_debug("debug: scheduler: "
+ "scheduling msg:%08" PRIx64, id);
else
- log_debug("debug: scheduler: scheduling evp:%016" PRIx64, id);
+ log_debug("debug: scheduler: "
+ "scheduling evp:%016" PRIx64, id);
backend->schedule(id);
scheduler_reset_events();
return;
@@ -184,15 +207,18 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg)
case IMSG_SCHEDULER_REMOVE:
id = *(uint64_t *)(imsg->data);
if (id <= 0xffffffffL)
- log_debug("debug: scheduler: removing msg:%08" PRIx64, id);
+ log_debug("debug: scheduler: "
+ "removing msg:%08" PRIx64, id);
else
- log_debug("debug: scheduler: removing evp:%016" PRIx64, id);
+ log_debug("debug: scheduler: "
+ "removing evp:%016" PRIx64, id);
backend->remove(id);
scheduler_reset_events();
return;
}
- errx(1, "scheduler_imsg: unexpected %s imsg", imsg_to_str(imsg->hdr.type));
+ errx(1, "scheduler_imsg: unexpected %s imsg",
+ imsg_to_str(imsg->hdr.type));
}
static void
@@ -349,7 +375,7 @@ scheduler_timeout(int fd, short event, void *p)
fatalx("scheduler_timeout: unknown batch type");
}
- evtimer_add(&env->sc_ev, &tv);
+ evtimer_add(&env->sc_ev, &tv);
}
static void
@@ -395,8 +421,8 @@ scheduler_process_bounce(struct scheduler_batch *batch)
while ((e = batch->evpids)) {
batch->evpids = e->next;
- log_debug("debug: scheduler: evp:%016" PRIx64 " scheduled (bounce)",
- e->id);
+ log_debug("debug: scheduler: evp:%016" PRIx64
+ " scheduled (bounce)", e->id);
imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_SMTP_ENQUEUE,
0, 0, -1, &e->id, sizeof e->id);
free(e);
@@ -412,8 +438,8 @@ scheduler_process_mda(struct scheduler_batch *batch)
while ((e = batch->evpids)) {
batch->evpids = e->next;
- log_debug("debug: scheduler: evp:%016" PRIx64 " scheduled (mda)",
- e->id);
+ log_debug("debug: scheduler: evp:%016" PRIx64
+ " scheduled (mda)", e->id);
imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_MDA_SESS_NEW,
0, 0, -1, &e->id, sizeof e->id);
free(e);
@@ -425,15 +451,15 @@ scheduler_process_mda(struct scheduler_batch *batch)
static void
scheduler_process_mta(struct scheduler_batch *batch)
{
- struct id_list *e;
+ struct id_list *e;
imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_BATCH_CREATE,
0, 0, -1, NULL, 0);
while ((e = batch->evpids)) {
batch->evpids = e->next;
- log_debug("debug: scheduler: evp:%016" PRIx64 " scheduled (mta)",
- e->id);
+ log_debug("debug: scheduler: evp:%016" PRIx64
+ " scheduled (mta)", e->id);
imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_BATCH_APPEND,
0, 0, -1, &e->id, sizeof e->id);
free(e);
diff --git a/usr.sbin/smtpd/scheduler_ramqueue.c b/usr.sbin/smtpd/scheduler_ramqueue.c
index 055d2c8b4d2..e073c4b35d1 100644
--- a/usr.sbin/smtpd/scheduler_ramqueue.c
+++ b/usr.sbin/smtpd/scheduler_ramqueue.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: scheduler_ramqueue.c,v 1.24 2012/11/12 14:58:53 eric Exp $ */
+/* $OpenBSD: scheduler_ramqueue.c,v 1.25 2012/11/20 09:47:45 eric Exp $ */
/*
* Copyright (c) 2012 Gilles Chehade <gilles@openbsd.org>
@@ -42,8 +42,8 @@ TAILQ_HEAD(evplist, rq_envelope);
struct rq_message {
uint32_t msgid;
struct tree envelopes;
- struct rq_message *sched_next;
- struct rq_envelope *sched_mta;
+ struct rq_message *q_next;
+ struct evplist q_mta;
};
struct rq_envelope {
@@ -64,7 +64,6 @@ struct rq_envelope {
struct rq_message *message;
- struct rq_envelope *sched_next;
time_t t_inflight;
time_t t_scheduled;
};
@@ -73,14 +72,14 @@ struct rq_queue {
size_t evpcount;
struct tree messages;
- struct evplist pending;
-
- struct rq_message *sched_mta;
- struct rq_envelope *sched_mda;
- struct rq_envelope *sched_bounce;
- struct rq_envelope *sched_expired;
- struct rq_envelope *sched_removed;
+ struct evplist q_pending;
+ struct evplist q_inflight;
+ struct rq_message *q_mtabatch;
+ struct evplist q_mda;
+ struct evplist q_bounce;
+ struct evplist q_expired;
+ struct evplist q_removed;
};
static void scheduler_ramqueue_init(void);
@@ -90,6 +89,8 @@ static size_t scheduler_ramqueue_rollback(uint32_t);
static void scheduler_ramqueue_update(struct scheduler_info *);
static void scheduler_ramqueue_delete(uint64_t);
static void scheduler_ramqueue_batch(int, struct scheduler_batch *);
+static size_t scheduler_ramqueue_messages(uint32_t, uint32_t *, size_t);
+static size_t scheduler_ramqueue_envelopes(uint64_t, struct evpstate *, size_t);
static void scheduler_ramqueue_schedule(uint64_t);
static void scheduler_ramqueue_remove(uint64_t);
@@ -117,6 +118,8 @@ struct scheduler_backend scheduler_backend_ramqueue = {
scheduler_ramqueue_batch,
+ scheduler_ramqueue_messages,
+ scheduler_ramqueue_envelopes,
scheduler_ramqueue_schedule,
scheduler_ramqueue_remove,
};
@@ -124,7 +127,7 @@ struct scheduler_backend scheduler_backend_ramqueue = {
static struct rq_queue ramqueue;
static struct tree updates;
-static time_t currtime;
+static time_t currtime;
extern int verbose;
@@ -159,6 +162,7 @@ scheduler_ramqueue_insert(struct scheduler_info *si)
if ((message = tree_get(&update->messages, msgid)) == NULL) {
message = xcalloc(1, sizeof *message, "scheduler_insert");
message->msgid = msgid;
+ TAILQ_INIT(&message->q_mta);
tree_init(&message->envelopes);
tree_xset(&update->messages, msgid, message);
stat_increment("scheduler.ramqueue.message", 1);
@@ -177,7 +181,7 @@ scheduler_ramqueue_insert(struct scheduler_info *si)
stat_increment("scheduler.ramqueue.envelope", 1);
envelope->flags = RQ_ENVELOPE_PENDING;
- sorted_insert(&update->pending, envelope);
+ sorted_insert(&update->q_pending, envelope);
}
static size_t
@@ -195,6 +199,10 @@ scheduler_ramqueue_commit(uint32_t msgid)
rq_queue_dump(update, "update to commit");
rq_queue_merge(&ramqueue, update);
+
+ if (verbose & TRACE_SCHEDULER)
+ rq_queue_dump(&ramqueue, "resulting queue");
+
rq_queue_schedule(&ramqueue);
free(update);
@@ -216,8 +224,8 @@ scheduler_ramqueue_rollback(uint32_t msgid)
return (0);
r = update->evpcount;
- while ((evp = TAILQ_FIRST(&update->pending))) {
- TAILQ_REMOVE(&update->pending, evp, entry);
+ while ((evp = TAILQ_FIRST(&update->q_pending))) {
+ TAILQ_REMOVE(&update->q_pending, evp, entry);
rq_envelope_delete(update, evp);
}
@@ -247,9 +255,10 @@ scheduler_ramqueue_update(struct scheduler_info *si)
while ((evp->sched = scheduler_compute_schedule(si)) <= currtime)
si->retry += 1;
+ TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
+ sorted_insert(&ramqueue.q_pending, evp);
evp->flags &= ~RQ_ENVELOPE_INFLIGHT;
evp->flags |= RQ_ENVELOPE_PENDING;
- sorted_insert(&ramqueue.pending, evp);
}
static void
@@ -269,6 +278,7 @@ scheduler_ramqueue_delete(uint64_t evpid)
if (!(evp->flags & RQ_ENVELOPE_INFLIGHT))
errx(1, "evp:%016" PRIx64 " not in-flight", evpid);
+ TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
evp->flags &= ~RQ_ENVELOPE_INFLIGHT;
rq_envelope_delete(&ramqueue, evp);
}
@@ -276,7 +286,9 @@ scheduler_ramqueue_delete(uint64_t evpid)
static void
scheduler_ramqueue_batch(int typemask, struct scheduler_batch *ret)
{
- struct rq_envelope *evp, *tmp, **batch;
+ struct evplist *q;
+ struct rq_envelope *evp;
+ struct rq_message *msg;
struct id_list *item;
currtime = time(NULL);
@@ -285,28 +297,30 @@ scheduler_ramqueue_batch(int typemask, struct scheduler_batch *ret)
if (verbose & TRACE_SCHEDULER)
rq_queue_dump(&ramqueue, "scheduler_ramqueue_batch()");
- if (typemask & SCHED_REMOVE && ramqueue.sched_removed) {
- batch = &ramqueue.sched_removed;
+ if (typemask & SCHED_REMOVE && TAILQ_FIRST(&ramqueue.q_removed)) {
+ q = &ramqueue.q_removed;
ret->type = SCHED_REMOVE;
}
- else if (typemask & SCHED_EXPIRE && ramqueue.sched_expired) {
- batch = &ramqueue.sched_expired;
+ else if (typemask & SCHED_EXPIRE && TAILQ_FIRST(&ramqueue.q_expired)) {
+ q = &ramqueue.q_expired;
ret->type = SCHED_EXPIRE;
}
- else if (typemask & SCHED_BOUNCE && ramqueue.sched_bounce) {
- batch = &ramqueue.sched_bounce;
+ else if (typemask & SCHED_BOUNCE && TAILQ_FIRST(&ramqueue.q_bounce)) {
+ q = &ramqueue.q_bounce;
ret->type = SCHED_BOUNCE;
}
- else if (typemask & SCHED_MDA && ramqueue.sched_mda) {
- batch = &ramqueue.sched_mda;
+ else if (typemask & SCHED_MDA && TAILQ_FIRST(&ramqueue.q_mda)) {
+ q = &ramqueue.q_mda;
ret->type = SCHED_MDA;
}
- else if (typemask & SCHED_MTA && ramqueue.sched_mta) {
- batch = &ramqueue.sched_mta->sched_mta;
- ramqueue.sched_mta = ramqueue.sched_mta->sched_next;
+ else if (typemask & SCHED_MTA && ramqueue.q_mtabatch) {
+ msg = ramqueue.q_mtabatch;
+ ramqueue.q_mtabatch = msg->q_next;
+ msg->q_next = NULL;
+ q = &msg->q_mta;
ret->type = SCHED_MTA;
}
- else if ((evp = TAILQ_FIRST(&ramqueue.pending))) {
+ else if ((evp = TAILQ_FIRST(&ramqueue.q_pending))) {
ret->type = SCHED_DELAY;
if (evp->sched < evp->expire)
ret->delay = evp->sched - currtime;
@@ -321,8 +335,10 @@ scheduler_ramqueue_batch(int typemask, struct scheduler_batch *ret)
ret->evpids = NULL;
ret->evpcount = 0;
- for(evp = *batch; evp; evp = tmp) {
- tmp = evp->sched_next;
+
+ while ((evp = TAILQ_FIRST(q))) {
+
+ TAILQ_REMOVE(q, evp, entry);
/* consistency check */
if (!(evp->flags & RQ_ENVELOPE_SCHEDULED))
@@ -332,18 +348,17 @@ scheduler_ramqueue_batch(int typemask, struct scheduler_batch *ret)
item->id = evp->evpid;
item->next = ret->evpids;
ret->evpids = item;
- evp->sched_next = NULL;
+
if (ret->type == SCHED_REMOVE || ret->type == SCHED_EXPIRE)
rq_envelope_delete(&ramqueue, evp);
else {
+ TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
evp->flags &= ~RQ_ENVELOPE_SCHEDULED;
evp->flags |= RQ_ENVELOPE_INFLIGHT;
evp->t_inflight = currtime;
}
ret->evpcount++;
}
-
- *batch = NULL;
}
static void
@@ -352,26 +367,18 @@ scheduler_ramqueue_schedule(uint64_t evpid)
struct rq_message *msg;
struct rq_envelope *evp;
uint32_t msgid;
- void *i, *j;
+ void *i;
currtime = time(NULL);
- if (evpid == 0) {
- j = NULL;
- while (tree_iter(&ramqueue.messages, &j, NULL, (void*)(&msg))) {
- i = NULL;
- while (tree_iter(&msg->envelopes, &i, NULL,
- (void*)(&evp)))
- rq_envelope_schedule(&ramqueue, evp);
- }
- }
- else if (evpid > 0xffffffff) {
+ if (evpid > 0xffffffff) {
msgid = evpid_to_msgid(evpid);
if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
return;
if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
return;
- rq_envelope_schedule(&ramqueue, evp);
+ if (evp->flags & RQ_ENVELOPE_PENDING)
+ rq_envelope_schedule(&ramqueue, evp);
}
else {
msgid = evpid;
@@ -379,7 +386,8 @@ scheduler_ramqueue_schedule(uint64_t evpid)
return;
i = NULL;
while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
- rq_envelope_schedule(&ramqueue, evp);
+ if (evp->flags & RQ_ENVELOPE_PENDING)
+ rq_envelope_schedule(&ramqueue, evp);
}
}
@@ -411,6 +419,62 @@ scheduler_ramqueue_remove(uint64_t evpid)
}
}
+static size_t
+scheduler_ramqueue_messages(uint32_t from, uint32_t *dst, size_t size)
+{
+ uint64_t id;
+ size_t n;
+ void *i;
+
+ for (n = 0, i = NULL; n < size; n++) {
+ if (tree_iterfrom(&ramqueue.messages, &i, from, &id, NULL) == 0)
+ break;
+ dst[n] = id;
+ }
+
+ return (n);
+}
+
+static size_t
+scheduler_ramqueue_envelopes(uint64_t from, struct evpstate *dst, size_t size)
+{
+ struct rq_message *msg;
+ struct rq_envelope *evp;
+ void *i;
+ size_t n;
+
+ if ((msg = tree_get(&ramqueue.messages, evpid_to_msgid(from))) == NULL)
+ return (0);
+
+ for (n = 0, i = NULL; n < size; ) {
+
+ if (tree_iterfrom(&msg->envelopes, &i, from, NULL,
+ (void**)&evp) == 0)
+ break;
+
+ if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED))
+ continue;
+
+ dst[n].retry = 0;
+ dst[n].evpid = evp->evpid;
+ if (evp->flags & RQ_ENVELOPE_PENDING) {
+ dst[n].time = evp->sched;
+ dst[n].flags = DF_PENDING;
+ }
+ else if (evp->flags & RQ_ENVELOPE_SCHEDULED) {
+ dst[n].time = evp->t_scheduled;
+ dst[n].flags = DF_PENDING;
+ }
+ else if (evp->flags & RQ_ENVELOPE_INFLIGHT) {
+ dst[n].time = evp->t_inflight;
+ dst[n].flags = DF_INFLIGHT;
+ }
+ n++;
+ }
+
+ return (n);
+}
+
static void
sorted_insert(struct evplist *list, struct rq_envelope *evp)
{
@@ -444,7 +508,12 @@ rq_queue_init(struct rq_queue *rq)
{
bzero(rq, sizeof *rq);
tree_init(&rq->messages);
- TAILQ_INIT(&rq->pending);
+ TAILQ_INIT(&rq->q_pending);
+ TAILQ_INIT(&rq->q_inflight);
+ TAILQ_INIT(&rq->q_mda);
+ TAILQ_INIT(&rq->q_bounce);
+ TAILQ_INIT(&rq->q_expired);
+ TAILQ_INIT(&rq->q_removed);
}
static void
@@ -463,7 +532,7 @@ rq_queue_merge(struct rq_queue *rq, struct rq_queue *update)
}
/* need to re-link all envelopes before merging them */
i = NULL;
- while((tree_iter(&message->envelopes, &i, &id,
+ while ((tree_iter(&message->envelopes, &i, &id,
(void*)&envelope)))
envelope->message = tomessage;
tree_merge(&tomessage->envelopes, &message->envelopes);
@@ -471,7 +540,8 @@ rq_queue_merge(struct rq_queue *rq, struct rq_queue *update)
stat_decrement("scheduler.ramqueue.message", 1);
}
- sorted_merge(&rq->pending, &update->pending);
+ sorted_merge(&rq->q_pending, &update->q_pending);
+ rq->evpcount += update->evpcount;
}
static void
@@ -479,23 +549,21 @@ rq_queue_schedule(struct rq_queue *rq)
{
struct rq_envelope *evp;
- while ((evp = TAILQ_FIRST(&rq->pending))) {
+ while ((evp = TAILQ_FIRST(&rq->q_pending))) {
if (evp->sched > currtime && evp->expire > currtime)
break;
- /* it *must* be pending */
if (evp->flags != RQ_ENVELOPE_PENDING)
errx(1, "evp:%016" PRIx64 " flags=0x%x", evp->evpid,
evp->flags);
if (evp->expire <= currtime) {
- TAILQ_REMOVE(&rq->pending, evp, entry);
+ TAILQ_REMOVE(&rq->q_pending, evp, entry);
+ TAILQ_INSERT_TAIL(&rq->q_expired, evp, entry);
evp->flags &= ~RQ_ENVELOPE_PENDING;
evp->flags |= RQ_ENVELOPE_EXPIRED;
evp->flags |= RQ_ENVELOPE_SCHEDULED;
evp->t_scheduled = currtime;
- evp->sched_next = rq->sched_expired;
- rq->sched_expired = evp;
continue;
}
rq_envelope_schedule(rq, evp);
@@ -505,28 +573,22 @@ rq_queue_schedule(struct rq_queue *rq)
static void
rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp)
{
- if (evp->flags & (RQ_ENVELOPE_SCHEDULED | RQ_ENVELOPE_INFLIGHT))
- return;
-
- if (evp->flags & RQ_ENVELOPE_PENDING)
- TAILQ_REMOVE(&rq->pending, evp, entry);
+ struct evplist *q = NULL;
if (evp->type == D_MTA) {
- if (evp->message->sched_mta == NULL) {
- evp->message->sched_next = rq->sched_mta;
- rq->sched_mta = evp->message;
+ if (TAILQ_EMPTY(&evp->message->q_mta)) {
+ evp->message->q_next = rq->q_mtabatch;
+ rq->q_mtabatch = evp->message;
}
- evp->sched_next = evp->message->sched_mta;
- evp->message->sched_mta = evp;
- }
- else if (evp->type == D_MDA) {
- evp->sched_next = rq->sched_mda;
- rq->sched_mda = evp;
- }
- else if (evp->type == D_BOUNCE) {
- evp->sched_next = rq->sched_bounce;
- rq->sched_bounce = evp;
+ q = &evp->message->q_mta;
}
+ else if (evp->type == D_MDA)
+ q = &rq->q_mda;
+ else if (evp->type == D_BOUNCE)
+ q = &rq->q_bounce;
+
+ TAILQ_REMOVE(&rq->q_pending, evp, entry);
+ TAILQ_INSERT_TAIL(q, evp, entry);
evp->flags &= ~RQ_ENVELOPE_PENDING;
evp->flags |= RQ_ENVELOPE_SCHEDULED;
evp->t_scheduled = currtime;
@@ -535,16 +597,51 @@ rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp)
static void
rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp)
{
- if (!(evp->flags & (RQ_ENVELOPE_PENDING)))
+ struct rq_message *m;
+ struct evplist *q = NULL;
+
+ if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED))
+ return;
+ /*
+ * For now we just ignore it, but we could mark the envelope for
+ * removal and possibly send a cancellation to the agent.
+ */
+ if (evp->flags & (RQ_ENVELOPE_INFLIGHT))
return;
- TAILQ_REMOVE(&rq->pending, evp, entry);
- evp->sched_next = rq->sched_removed;
- rq->sched_removed = evp;
+ if (evp->flags & RQ_ENVELOPE_SCHEDULED) {
+ if (evp->type == D_MTA)
+ q = &evp->message->q_mta;
+ else if (evp->type == D_MDA)
+ q = &rq->q_mda;
+ else if (evp->type == D_BOUNCE)
+ q = &rq->q_bounce;
+ } else
+ q = &rq->q_pending;
+
+ TAILQ_REMOVE(q, evp, entry);
+ TAILQ_INSERT_TAIL(&rq->q_removed, evp, entry);
evp->flags &= ~RQ_ENVELOPE_PENDING;
evp->flags |= RQ_ENVELOPE_REMOVED;
evp->flags |= RQ_ENVELOPE_SCHEDULED;
evp->t_scheduled = currtime;
+
+ /*
+ * We might need to unschedule the message if it was the only
+ * scheduled envelope
+ */
+ if (q == &evp->message->q_mta && TAILQ_EMPTY(q)) {
+ if (rq->q_mtabatch == evp->message)
+ rq->q_mtabatch = evp->message->q_next;
+ else {
+ for (m = rq->q_mtabatch; m->q_next; m = m->q_next)
+ if (m->q_next == evp->message) {
+ m->q_next = evp->message->q_next;
+ break;
+ }
+ }
+ evp->message->q_next = NULL;
+ }
}
static void
@@ -558,6 +655,7 @@ rq_envelope_delete(struct rq_queue *rq, struct rq_envelope *evp)
}
free(evp);
+ rq->evpcount--;
stat_decrement("scheduler.ramqueue.envelope", 1);
}
@@ -576,7 +674,8 @@ rq_envelope_to_text(struct rq_envelope *e)
else if (e->type == D_MTA)
strlcat(buf, "mta", sizeof buf);
- snprintf(t, sizeof t, ",expire=%s", duration_to_text(e->expire - currtime));
+ snprintf(t, sizeof t, ",expire=%s",
+ duration_to_text(e->expire - currtime));
strlcat(buf, t, sizeof buf);
if (e->flags & RQ_ENVELOPE_PENDING) {
@@ -615,10 +714,10 @@ rq_queue_dump(struct rq_queue *rq, const char * name)
log_debug("debug: /--- ramqueue: %s", name);
i = NULL;
- while((tree_iter(&rq->messages, &i, &id, (void*)&message))) {
+ while ((tree_iter(&rq->messages, &i, &id, (void*)&message))) {
log_debug("debug: | msg:%08" PRIx32, message->msgid);
j = NULL;
- while((tree_iter(&message->envelopes, &j, &id,
+ while ((tree_iter(&message->envelopes, &j, &id,
(void*)&envelope)))
log_debug("debug: | %s",
rq_envelope_to_text(envelope));
diff --git a/usr.sbin/smtpd/smtpctl.8 b/usr.sbin/smtpd/smtpctl.8
index 96b9637fd05..b8db5ed7586 100644
--- a/usr.sbin/smtpd/smtpctl.8
+++ b/usr.sbin/smtpd/smtpctl.8
@@ -1,4 +1,4 @@
-.\" $OpenBSD: smtpctl.8,v 1.34 2012/10/17 08:38:48 eric Exp $
+.\" $OpenBSD: smtpctl.8,v 1.35 2012/11/20 09:47:46 eric Exp $
.\"
.\" Copyright (c) 2006 Pierre-Yves Ritschard <pyr@openbsd.org>
.\"
@@ -14,7 +14,7 @@
.\" ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
.\" OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
.\"
-.Dd $Mdocdate: October 17 2012 $
+.Dd $Mdocdate: November 20 2012 $
.Dt SMTPCTL 8
.Os
.Sh NAME
@@ -40,6 +40,43 @@ The following commands are available:
Disable verbose debug logging.
.It Cm log verbose
Enable verbose debug logging.
+.It Cm monitor
+Display updates of some
+.Xr smtpd 8
+internal counters in one second intervals.
+Each line reports the increment of all counters since the last update,
+except for some counters which are always absolute values.
+The first line reports the current value of each counter.
+The fields are:
+.Pp
+.Bl -compact -bullet
+.It
+Current number of active SMTP clients (absolute value).
+.It
+New SMTP clients.
+.It
+Disconnected clients.
+.It
+Current number of envelopes in the queue (absolute value).
+.It
+Newly enqueued envelopes.
+.It
+Dequeued envelopes.
+.It
+Successful deliveries.
+.It
+Temporary failures.
+.It
+Permanent failures.
+.It
+Message loops.
+.It
+Expired envelopes.
+.It
+Envelopes removed by the administrator.
+.It
+Generated bounces.
+.El
.It Cm pause mda
Temporarily stop deliveries to local users.
.It Cm pause mta
@@ -48,7 +85,7 @@ remote users.
.It Cm pause smtp
Temporarily stop accepting incoming sessions.
.It Cm remove Ar envelope-id | message-id
-Removes a single envelope, or all envelopes with the same message ID.
+Remove a single envelope, or all envelopes with the same message ID.
.It Cm resume mda
Resume deliveries to local users.
.It Cm resume mta
@@ -56,26 +93,64 @@ Resume relaying and deliveries to remote users.
.It Cm resume smtp
Resume accepting incoming sessions.
.It Cm schedule-all
-Marks all envelopes as ready for immediate delivery.
+Mark all envelopes as ready for immediate delivery.
.It Cm schedule-id Ar envelope-id | message-id
-Marks a single envelope, or all envelopes with the same message ID,
+Mark a single envelope, or all envelopes with the same message ID,
as ready for immediate delivery.
.It Cm show envelope Ar envelope-id
-Displays envelope's content for the given ID.
+Display envelope content for the given ID.
.It Cm show message Ar envelope-id
-Displays message content for the given ID.
+Display message content for the given ID.
.It Cm show queue
-Displays information concerning envelopes
-that are currently in a queue.
+Display information concerning envelopes that are currently in the queue.
+Each line of output describes a single envelope.
+It consists of the following fields, separated by a "|":
+.Pp
+.Bl -compact -bullet
+.It
+Envelope id.
+.It
+Address family of the client which enqueued the mail.
+.It
+Type of delivery: one of "mta", "mda" or "bounce".
+.It
+Various flags on the envelope.
+.It
+Sender address (return path).
+.It
+The original recipient address.
+.It
+The destination address.
+.It
+Time of creation.
+.It
+Time of expiration.
+.It
+Time of last delivery or relaying attempt.
+.It
+Number of delivery or relaying attempts.
+.It
+Current runstate: either "pending" or "inflight" if
+.Xr smtpd 8
+is running, or "offline" otherwise.
+.It
+Delay in seconds before the next attempt if pending, or time ellapsed
+if currently running.
+This field is blank if
+.Xr smtpd 8
+is not running.
+.It
+Error string for the last failed delivery or relay attempt.
+.El
.It Cm show stats
Displays runtime statistics concerning
.Xr smtpd 8 .
-.It Cm update map Ar name
-For map backends that provide caching, causes
-.Xr smtpd 8
-to update the cache.
.It Cm stop
Stop the server.
+.It Cm update table Ar name
+For table backends that provide caching, causes
+.Xr smtpd 8
+to update the cache.
.El
.Pp
When
diff --git a/usr.sbin/smtpd/smtpctl.c b/usr.sbin/smtpd/smtpctl.c
index dca5f2941dc..3ecf6c54ec1 100644
--- a/usr.sbin/smtpd/smtpctl.c
+++ b/usr.sbin/smtpd/smtpctl.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: smtpctl.c,v 1.96 2012/11/13 09:35:18 eric Exp $ */
+/* $OpenBSD: smtpctl.c,v 1.97 2012/11/20 09:47:46 eric Exp $ */
/*
* Copyright (c) 2006 Pierre-Yves Ritschard <pyr@openbsd.org>
@@ -61,6 +61,10 @@ static void show_monitor(struct stat_digest *);
static int try_connect(void);
static void flush(void);
static void next_message(struct imsg *);
+static int action_schedule_all(void);
+
+static int action_show_queue(void);
+static int action_show_queue_message(uint32_t);
int proctype;
struct imsgbuf *ibuf;
@@ -70,16 +74,19 @@ extern char *__progname;
struct smtpd *env = NULL;
+time_t now;
+
__dead void
usage(void)
{
extern char *__progname;
if (sendmail)
- fprintf(stderr, "usage: %s [-tv] [-f from] [-F name] to ..\n",
+ fprintf(stderr, "usage: %s [-tv] [-f from] [-F name] to ...\n",
__progname);
else
- fprintf(stderr, "usage: %s command [argument ...]\n", __progname);
+ fprintf(stderr, "usage: %s command [argument ...]\n",
+ __progname);
exit(1);
}
@@ -139,7 +146,7 @@ next_message(struct imsg *imsg)
{
ssize_t n;
- while(1) {
+ while (1) {
if ((n = imsg_get(ibuf, imsg)) == -1)
errx(1, "imsg_get error");
if (n)
@@ -175,7 +182,6 @@ main(int argc, char *argv[])
if (geteuid())
errx(1, "need root privileges");
- setup_env(&smtpd);
if (strcmp(__progname, "mailq") == 0)
action = SHOW_QUEUE;
@@ -186,24 +192,26 @@ main(int argc, char *argv[])
} else
errx(1, "unsupported mode");
- /* test for not connected actions */
- switch (action) {
- case SHOW_QUEUE:
- show_queue(0);
- return (0);
- case SHOW_ENVELOPE:
- show_envelope(res->data);
- return (0);
- case SHOW_MESSAGE:
- show_message(res->data);
+ if (action == SHOW_ENVELOPE ||
+ action == SHOW_MESSAGE ||
+ !try_connect()) {
+ setup_env(&smtpd);
+ switch (action) {
+ case SHOW_QUEUE:
+ show_queue(0);
+ break;
+ case SHOW_ENVELOPE:
+ show_envelope(res->data);
+ break;
+ case SHOW_MESSAGE:
+ show_message(res->data);
+ break;
+ default:
+ errx(1, "smtpd doesn't seem to be running");
+ }
return (0);
- default:
- break;
}
- if (!try_connect())
- errx(1, "smtpd doesn't seem to be running");
-
/* process user request */
switch (action) {
case NONE:
@@ -220,11 +228,10 @@ main(int argc, char *argv[])
imsg_compose(ibuf, IMSG_SCHEDULER_REMOVE, 0, 0, -1, &ulval,
sizeof(ulval));
break;
+ case SHOW_QUEUE:
+ return action_show_queue();
case SCHEDULE_ALL:
- ulval = 0;
- imsg_compose(ibuf, IMSG_SCHEDULER_SCHEDULE, 0, 0, -1, &ulval,
- sizeof(ulval));
- break;
+ return action_schedule_all();
case SHUTDOWN:
imsg_compose(ibuf, IMSG_CTL_SHUTDOWN, 0, 0, -1, NULL, 0);
break;
@@ -280,15 +287,13 @@ main(int argc, char *argv[])
errx(1, "unknown request (%d)", action);
}
- while (!done) {
-
+ do {
flush();
next_message(&imsg);
- switch(action) {
+ switch (action) {
case REMOVE:
case SCHEDULE:
- case SCHEDULE_ALL:
case SHUTDOWN:
case PAUSE_MDA:
case PAUSE_MTA:
@@ -315,12 +320,129 @@ main(int argc, char *argv[])
}
imsg_free(&imsg);
- }
+ } while (!done);
free(ibuf);
return (0);
}
+
+static int
+action_show_queue_message(uint32_t msgid)
+{
+ struct imsg imsg;
+ struct envelope *evp;
+ uint64_t evpid;
+ size_t found;
+
+ evpid = msgid_to_evpid(msgid);
+
+ nextbatch:
+
+ found = 0;
+ imsg_compose(ibuf, IMSG_SCHEDULER_ENVELOPES, 0, 0, -1,
+ &evpid, sizeof evpid);
+ flush();
+
+ while (1) {
+ next_message(&imsg);
+ if (imsg.hdr.type != IMSG_SCHEDULER_ENVELOPES)
+ errx(1, "unexpected message %i", imsg.hdr.type);
+
+ if (imsg.hdr.len == sizeof imsg.hdr) {
+ imsg_free(&imsg);
+ if (!found || evpid_to_msgid(++evpid) != msgid)
+ return (0);
+ goto nextbatch;
+ }
+ found++;
+ evp = imsg.data;
+ evpid = evp->id;
+ show_queue_envelope(evp, 1);
+ imsg_free(&imsg);
+ }
+
+}
+
+static int
+action_show_queue(void)
+{
+ struct imsg imsg;
+ uint32_t *msgids, msgid;
+ size_t i, n;
+
+ msgid = 0;
+ now = time(NULL);
+
+ do {
+ imsg_compose(ibuf, IMSG_SCHEDULER_MESSAGES, 0, 0, -1,
+ &msgid, sizeof msgid);
+ flush();
+ next_message(&imsg);
+ if (imsg.hdr.type != IMSG_SCHEDULER_MESSAGES)
+ errx(1, "unexpected message type %i", imsg.hdr.type);
+ msgids = imsg.data;
+ n = (imsg.hdr.len - sizeof imsg.hdr) / sizeof (*msgids);
+ if (n == 0) {
+ imsg_free(&imsg);
+ break;
+ }
+ for (i = 0; i < n; i++) {
+ msgid = msgids[i];
+ action_show_queue_message(msgid);
+ }
+ imsg_free(&imsg);
+
+ } while (++msgid);
+
+ return (0);
+}
+
+static int
+action_schedule_all(void)
+{
+ struct imsg imsg;
+ uint32_t *msgids, from;
+ uint64_t evpid;
+ size_t i, n;
+
+ from = 0;
+ while (1) {
+ imsg_compose(ibuf, IMSG_SCHEDULER_MESSAGES, 0, 0, -1,
+ &from, sizeof from);
+ flush();
+ next_message(&imsg);
+ if (imsg.hdr.type != IMSG_SCHEDULER_MESSAGES)
+ errx(1, "unexpected message type %i", imsg.hdr.type);
+ msgids = imsg.data;
+ n = (imsg.hdr.len - sizeof imsg.hdr) / sizeof (*msgids);
+ if (n == 0)
+ break;
+
+ for (i = 0; i < n; i++) {
+ evpid = msgids[i];
+ imsg_compose(ibuf, IMSG_SCHEDULER_SCHEDULE, 0,
+ 0, -1, &evpid, sizeof(evpid));
+ }
+ from = msgids[n - 1] + 1;
+
+ imsg_free(&imsg);
+ flush();
+
+ for (i = 0; i < n; i++) {
+ next_message(&imsg);
+ if (imsg.hdr.type != IMSG_CTL_OK)
+ errx(1, "unexpected message type %i",
+ imsg.hdr.type);
+ }
+
+ if (from == 0)
+ break;
+ }
+
+ return (0);
+}
+
static int
show_command_output(struct imsg *imsg)
{
@@ -346,7 +468,7 @@ show_stats_output(void)
bzero(&kv, sizeof kv);
- while(1) {
+ while (1) {
imsg_compose(ibuf, IMSG_STATS_GET, 0, 0, -1, &kv, sizeof kv);
flush();
next_message(&imsg);
@@ -361,7 +483,7 @@ show_stats_output(void)
if (strcmp(kvp->key, "uptime") == 0) {
duration = time(NULL) - kvp->val.u.counter;
- printf("uptime=%zd\n", (size_t)duration);
+ printf("uptime=%zd\n", (size_t)duration);
printf("uptime.human=%s\n",
duration_to_text(duration));
}
@@ -418,11 +540,12 @@ show_queue(int flags)
qwalk_close(q);
}
+
static void
-show_queue_envelope(struct envelope *e, int flags)
+show_queue_envelope(struct envelope *e, int online)
{
- const char *src = "?";
- char status[128];
+ const char *src = "?", *agent = "?";
+ char status[128], runstate[128];
status[0] = '\0';
@@ -433,28 +556,33 @@ show_queue_envelope(struct envelope *e, int flags)
getflag(&e->flags, DF_INTERNAL, "internal",
status, sizeof(status));
+ if (online) {
+ if (e->flags & DF_PENDING)
+ snprintf(runstate, sizeof runstate, "pending|%zi",
+ (ssize_t)(e->nexttry - now));
+ else if (e->flags & DF_INFLIGHT)
+ snprintf(runstate, sizeof runstate, "inflight|%zi",
+ (ssize_t)(now - e->lasttry));
+ else
+ snprintf(runstate, sizeof runstate, "invalid|");
+ e->flags &= ~(DF_PENDING|DF_INFLIGHT);
+ }
+ else
+ strlcpy(runstate, "offline|", sizeof runstate);
+
if (e->flags)
errx(1, "%016" PRIx64 ": unexpected flags 0x%04x", e->id,
e->flags);
-
+
if (status[0])
status[strlen(status) - 1] = '\0';
- else
- strlcpy(status, "-", sizeof(status));
- switch (e->type) {
- case D_MDA:
- printf("mda");
- break;
- case D_MTA:
- printf("mta");
- break;
- case D_BOUNCE:
- printf("bounce");
- break;
- default:
- printf("unknown");
- }
+ if (e->type == D_MDA)
+ agent = "mda";
+ else if (e->type == D_MTA)
+ agent = "mta";
+ else if (e->type == D_BOUNCE)
+ agent = "bounce";
if (e->ss.ss_family == AF_LOCAL)
src = "local";
@@ -463,20 +591,25 @@ show_queue_envelope(struct envelope *e, int flags)
else if (e->ss.ss_family == AF_INET6)
src = "inet6";
- printf("|%016" PRIx64 "|%s|%s|%s@%s|%s@%s|%" PRId64 "|%" PRId64 "|%u",
+ printf("%016"PRIx64
+ "|%s|%s|%s|%s@%s|%s@%s|%s@%s"
+ "|%zu|%zu|%zu|%zu|%s|%s\n",
+
e->id,
+
src,
+ agent,
status,
e->sender.user, e->sender.domain,
+ e->rcpt.user, e->rcpt.domain,
e->dest.user, e->dest.domain,
- (int64_t) e->lasttry,
- (int64_t) e->expire,
- e->retry);
-
- if (e->errorline[0] != '\0')
- printf("|%s", e->errorline);
-
- printf("\n");
+
+ (size_t) e->creation,
+ (size_t) (e->creation + e->expire),
+ (size_t) e->lasttry,
+ (size_t) e->retry,
+ runstate,
+ e->errorline);
}
static void
diff --git a/usr.sbin/smtpd/smtpd.c b/usr.sbin/smtpd/smtpd.c
index ffb57be4e46..4b73879089d 100644
--- a/usr.sbin/smtpd/smtpd.c
+++ b/usr.sbin/smtpd/smtpd.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: smtpd.c,v 1.181 2012/11/12 14:58:53 eric Exp $ */
+/* $OpenBSD: smtpd.c,v 1.182 2012/11/20 09:47:46 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -1339,6 +1339,8 @@ imsg_to_str(int type)
CASE(IMSG_QUEUE_REMOVE);
CASE(IMSG_QUEUE_EXPIRE);
+ CASE(IMSG_SCHEDULER_MESSAGES);
+ CASE(IMSG_SCHEDULER_ENVELOPES);
CASE(IMSG_SCHEDULER_REMOVE);
CASE(IMSG_SCHEDULER_SCHEDULE);
diff --git a/usr.sbin/smtpd/smtpd.h b/usr.sbin/smtpd/smtpd.h
index 5c3b342bd9b..fbc1d230baf 100644
--- a/usr.sbin/smtpd/smtpd.h
+++ b/usr.sbin/smtpd/smtpd.h
@@ -1,4 +1,4 @@
-/* $OpenBSD: smtpd.h,v 1.395 2012/11/12 14:58:53 eric Exp $ */
+/* $OpenBSD: smtpd.h,v 1.396 2012/11/20 09:47:46 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -164,6 +164,8 @@ enum imsg_type {
IMSG_QUEUE_REMOVE,
IMSG_QUEUE_EXPIRE,
+ IMSG_SCHEDULER_MESSAGES,
+ IMSG_SCHEDULER_ENVELOPES,
IMSG_SCHEDULER_REMOVE,
IMSG_SCHEDULER_SCHEDULE,
@@ -337,7 +339,11 @@ enum delivery_status {
enum delivery_flags {
DF_AUTHENTICATED = 0x1,
DF_BOUNCE = 0x4,
- DF_INTERNAL = 0x8 /* internal expansion forward */
+ DF_INTERNAL = 0x8, /* internal expansion forward */
+
+ /* runstate, not saved on disk */
+ DF_PENDING = 0x10,
+ DF_INFLIGHT = 0x20,
};
struct delivery_mda {
@@ -419,6 +425,7 @@ struct envelope {
time_t expire;
uint16_t retry;
enum delivery_flags flags;
+ time_t nexttry;
};
enum envelope_field {
@@ -813,6 +820,13 @@ struct delivery_backend {
void (*open)(struct deliver *);
};
+struct evpstate {
+ uint64_t evpid;
+ uint16_t flags;
+ uint16_t retry;
+ time_t time;
+};
+
struct scheduler_info {
uint64_t evpid;
enum delivery_type type;
@@ -854,6 +868,8 @@ struct scheduler_backend {
void (*batch)(int, struct scheduler_batch *);
+ size_t (*messages)(uint32_t, uint32_t *, size_t);
+ size_t (*envelopes)(uint64_t, struct evpstate *, size_t);
void (*schedule)(uint64_t);
void (*remove)(uint64_t);
};
@@ -1152,6 +1168,7 @@ void *tree_xpop(struct tree *, uint64_t);
int tree_poproot(struct tree *, uint64_t *, void **);
int tree_root(struct tree *, uint64_t *, void **);
int tree_iter(struct tree *, void **, uint64_t *, void **);
+int tree_iterfrom(struct tree *, void **, uint64_t, uint64_t *, void **);
void tree_merge(struct tree *, struct tree *);
diff --git a/usr.sbin/smtpd/tree.c b/usr.sbin/smtpd/tree.c
index ce147e33c02..bc5a7b22149 100644
--- a/usr.sbin/smtpd/tree.c
+++ b/usr.sbin/smtpd/tree.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: tree.c,v 1.1 2012/08/07 21:47:58 eric Exp $ */
+/* $OpenBSD: tree.c,v 1.2 2012/11/20 09:47:46 eric Exp $ */
/*
* Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
@@ -193,6 +193,38 @@ tree_iter(struct tree *t, void **hdl, uint64_t *id, void **data)
return (0);
}
+int
+tree_iterfrom(struct tree *t, void **hdl, uint64_t k, uint64_t *id, void **data)
+{
+ struct treeentry *curr = *hdl, key;
+
+ if (curr == NULL) {
+ if (k == 0)
+ curr = SPLAY_MIN(tree, t);
+ else {
+ key.id = k;
+ curr = SPLAY_FIND(tree, t, &key);
+ if (curr == NULL) {
+ SPLAY_INSERT(tree, t, &key);
+ curr = SPLAY_NEXT(tree, t, &key);
+ SPLAY_REMOVE(tree, t, &key);
+ }
+ }
+ } else
+ curr = SPLAY_NEXT(tree, t, curr);
+
+ if (curr) {
+ *hdl = curr;
+ if (id)
+ *id = curr->id;
+ if (data)
+ *data = curr->data;
+ return (1);
+ }
+
+ return (0);
+}
+
void
tree_merge(struct tree *dst, struct tree *src)
{