summaryrefslogtreecommitdiff
path: root/usr.sbin/smtpd/scheduler.c
diff options
context:
space:
mode:
authorEric Faurot <eric@cvs.openbsd.org>2012-11-20 09:47:47 +0000
committerEric Faurot <eric@cvs.openbsd.org>2012-11-20 09:47:47 +0000
commit65898a3024a49104a9e8123deec105b11de001e3 (patch)
tree657d34d40deed3c1977a7b942dc5e32505512802 /usr.sbin/smtpd/scheduler.c
parentf6aefaa94996adb3a94cf829d7d536f2457f186f (diff)
Allow "smtpctl show queue" to run in "online" mode if the smtpd server
is running. The scheduler sends the runtime state of each envelope to the queue process which loads the envelope, fills the runtime bits and sends the envelope back to the client. Iteration over the envelope set happens in small chunks to make the request interruptible and to allow the server to keep doing its job in the meantime. Adpat "smtpctl schedule-all" to schedule the messages one by one using the same iteration mechanism. Document "smtpctl monitor" and "smtpctl show queue". ok gilles@
Diffstat (limited to 'usr.sbin/smtpd/scheduler.c')
-rw-r--r--usr.sbin/smtpd/scheduler.c64
1 files changed, 45 insertions, 19 deletions
diff --git a/usr.sbin/smtpd/scheduler.c b/usr.sbin/smtpd/scheduler.c
index 44f0f15eec1..31cddeecaef 100644
--- a/usr.sbin/smtpd/scheduler.c
+++ b/usr.sbin/smtpd/scheduler.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: scheduler.c,v 1.23 2012/11/12 14:58:53 eric Exp $ */
+/* $OpenBSD: scheduler.c,v 1.24 2012/11/20 09:47:45 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -60,14 +60,18 @@ static struct scheduler_backend *backend = NULL;
extern const char *backend_scheduler;
+#define MSGBATCHSIZE 1024
+#define EVPBATCHSIZE 256
+
void
scheduler_imsg(struct imsgev *iev, struct imsg *imsg)
{
+ struct evpstate state[EVPBATCHSIZE];
struct envelope *e;
struct scheduler_info si;
uint64_t id;
- uint32_t msgid;
- size_t n;
+ uint32_t msgid, msgids[MSGBATCHSIZE];
+ size_t n, i;
switch (imsg->hdr.type) {
@@ -169,14 +173,33 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg)
log_verbose(*(int *)imsg->data);
return;
+ case IMSG_SCHEDULER_MESSAGES:
+ msgid = *(uint32_t *)(imsg->data);
+ n = backend->messages(msgid, msgids, MSGBATCHSIZE);
+ imsg_compose_event(iev, IMSG_SCHEDULER_MESSAGES,
+ imsg->hdr.peerid, 0, -1, msgids, n * sizeof (*msgids));
+ return;
+
+ case IMSG_SCHEDULER_ENVELOPES:
+ id = *(uint64_t *)(imsg->data);
+ n = backend->envelopes(id, state, EVPBATCHSIZE);
+ for (i = 0; i < n; i++) {
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE],
+ IMSG_SCHEDULER_ENVELOPES, imsg->hdr.peerid, 0, -1,
+ &state[i], sizeof state[i]);
+ }
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE],
+ IMSG_SCHEDULER_ENVELOPES, imsg->hdr.peerid, 0, -1, NULL, 0);
+ return;
+
case IMSG_SCHEDULER_SCHEDULE:
id = *(uint64_t *)(imsg->data);
- if (id == 0)
- log_debug("debug: scheduler: scheduling all envelopes");
- else if (id <= 0xffffffffL)
- log_debug("debug: scheduler: scheduling msg:%08" PRIx64, id);
+ if (id <= 0xffffffffL)
+ log_debug("debug: scheduler: "
+ "scheduling msg:%08" PRIx64, id);
else
- log_debug("debug: scheduler: scheduling evp:%016" PRIx64, id);
+ log_debug("debug: scheduler: "
+ "scheduling evp:%016" PRIx64, id);
backend->schedule(id);
scheduler_reset_events();
return;
@@ -184,15 +207,18 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg)
case IMSG_SCHEDULER_REMOVE:
id = *(uint64_t *)(imsg->data);
if (id <= 0xffffffffL)
- log_debug("debug: scheduler: removing msg:%08" PRIx64, id);
+ log_debug("debug: scheduler: "
+ "removing msg:%08" PRIx64, id);
else
- log_debug("debug: scheduler: removing evp:%016" PRIx64, id);
+ log_debug("debug: scheduler: "
+ "removing evp:%016" PRIx64, id);
backend->remove(id);
scheduler_reset_events();
return;
}
- errx(1, "scheduler_imsg: unexpected %s imsg", imsg_to_str(imsg->hdr.type));
+ errx(1, "scheduler_imsg: unexpected %s imsg",
+ imsg_to_str(imsg->hdr.type));
}
static void
@@ -349,7 +375,7 @@ scheduler_timeout(int fd, short event, void *p)
fatalx("scheduler_timeout: unknown batch type");
}
- evtimer_add(&env->sc_ev, &tv);
+ evtimer_add(&env->sc_ev, &tv);
}
static void
@@ -395,8 +421,8 @@ scheduler_process_bounce(struct scheduler_batch *batch)
while ((e = batch->evpids)) {
batch->evpids = e->next;
- log_debug("debug: scheduler: evp:%016" PRIx64 " scheduled (bounce)",
- e->id);
+ log_debug("debug: scheduler: evp:%016" PRIx64
+ " scheduled (bounce)", e->id);
imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_SMTP_ENQUEUE,
0, 0, -1, &e->id, sizeof e->id);
free(e);
@@ -412,8 +438,8 @@ scheduler_process_mda(struct scheduler_batch *batch)
while ((e = batch->evpids)) {
batch->evpids = e->next;
- log_debug("debug: scheduler: evp:%016" PRIx64 " scheduled (mda)",
- e->id);
+ log_debug("debug: scheduler: evp:%016" PRIx64
+ " scheduled (mda)", e->id);
imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_MDA_SESS_NEW,
0, 0, -1, &e->id, sizeof e->id);
free(e);
@@ -425,15 +451,15 @@ scheduler_process_mda(struct scheduler_batch *batch)
static void
scheduler_process_mta(struct scheduler_batch *batch)
{
- struct id_list *e;
+ struct id_list *e;
imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_BATCH_CREATE,
0, 0, -1, NULL, 0);
while ((e = batch->evpids)) {
batch->evpids = e->next;
- log_debug("debug: scheduler: evp:%016" PRIx64 " scheduled (mta)",
- e->id);
+ log_debug("debug: scheduler: evp:%016" PRIx64
+ " scheduled (mta)", e->id);
imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_BATCH_APPEND,
0, 0, -1, &e->id, sizeof e->id);
free(e);