summaryrefslogtreecommitdiff
path: root/usr.sbin/smtpd/scheduler.c
diff options
context:
space:
mode:
authorEric Faurot <eric@cvs.openbsd.org>2012-08-08 08:50:43 +0000
committerEric Faurot <eric@cvs.openbsd.org>2012-08-08 08:50:43 +0000
commit4fb538ff7fe692fc3494a906f82178b14bf48bf1 (patch)
tree74c2a8e0bb894841c747e6bee85be58c05869135 /usr.sbin/smtpd/scheduler.c
parente653821d1952b330839a2b474720109458d1d375 (diff)
Improve the scheduler backend API.
New envelopes are pushed into the scheduler through the insert() commit() rollback() transactional interface functions. Worklists are pulled from the scheduler through a single batch() interface function, which returns a list of envelope ids and the type of processing. Envelopes returned in this batch are said to be "in-flight", as opposed to "pending". They are supposed to be processed in some way, and either updated() or deleted() at some point. The schedule()/remove() functions are used to alter the internal state of "pending" envelopes to make them schedulable. The enve- lopes will be part of a worklist on the next call to batch(). Rewrite the scheduler_ramqueue backend. The initial queue loading in now done by the queue. ok gilles@
Diffstat (limited to 'usr.sbin/smtpd/scheduler.c')
-rw-r--r--usr.sbin/smtpd/scheduler.c454
1 files changed, 174 insertions, 280 deletions
diff --git a/usr.sbin/smtpd/scheduler.c b/usr.sbin/smtpd/scheduler.c
index 490e4db546e..9efaf8029a4 100644
--- a/usr.sbin/smtpd/scheduler.c
+++ b/usr.sbin/smtpd/scheduler.c
@@ -1,9 +1,10 @@
-/* $OpenBSD: scheduler.c,v 1.7 2012/07/18 22:04:49 eric Exp $ */
+/* $OpenBSD: scheduler.c,v 1.8 2012/08/08 08:50:42 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
* Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
* Copyright (c) 2008-2009 Jacek Masiulaniec <jacekm@dobremiasto.net>
+ * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
@@ -47,17 +48,18 @@
static void scheduler_imsg(struct imsgev *, struct imsg *);
static void scheduler_shutdown(void);
static void scheduler_sig_handler(int, short, void *);
-static void scheduler_setup_events(void);
static void scheduler_reset_events(void);
-static void scheduler_disable_events(void);
static void scheduler_timeout(int, short, void *);
-static void scheduler_remove(u_int64_t);
-static void scheduler_remove_envelope(u_int64_t);
-static int scheduler_process_envelope(u_int64_t);
-static int scheduler_process_batch(enum delivery_type, u_int64_t);
-static int scheduler_check_loop(struct envelope *);
+static void scheduler_process_remove(struct scheduler_batch *);
+static void scheduler_process_expire(struct scheduler_batch *);
+static void scheduler_process_bounce(struct scheduler_batch *);
+static void scheduler_process_mda(struct scheduler_batch *);
+static void scheduler_process_mta(struct scheduler_batch *);
static int scheduler_load_message(u_int32_t);
+void scheduler_envelope_update(struct envelope *);
+void scheduler_envelope_delete(struct envelope *);
+
static struct scheduler_backend *backend = NULL;
extern const char *backend_scheduler;
@@ -84,8 +86,9 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg)
e = imsg->data;
log_trace(TRACE_SCHEDULER,
"scheduler: IMSG_QUEUE_DELIVERY_OK: %016"PRIx64, e->id);
- backend->remove(e->id);
+ backend->delete(e->id);
queue_envelope_delete(e);
+ scheduler_reset_events();
return;
case IMSG_QUEUE_DELIVERY_TEMPFAIL:
@@ -96,7 +99,7 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg)
e->retry++;
queue_envelope_update(e);
scheduler_info(&si, e);
- backend->insert(&si);
+ backend->update(&si);
scheduler_reset_events();
return;
@@ -109,10 +112,11 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg)
bounce_record_message(e, &bounce);
scheduler_info(&si, &bounce);
backend->insert(&si);
- scheduler_reset_events();
+ backend->commit(evpid_to_msgid(bounce.id));
}
- backend->remove(e->id);
+ backend->delete(e->id);
queue_envelope_delete(e);
+ scheduler_reset_events();
return;
case IMSG_MDA_SESS_NEW:
@@ -138,7 +142,7 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg)
if (imsg->fd < 0 || !bounce_session(imsg->fd, e)) {
queue_envelope_update(e);
scheduler_info(&si, e);
- backend->insert(&si);
+ backend->update(&si);
scheduler_reset_events();
return;
}
@@ -175,24 +179,23 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg)
log_trace(TRACE_SCHEDULER,
"scheduler: IMSG_SCHEDULER_SCHEDULE: %016"PRIx64,
*(u_int64_t *)imsg->data);
- backend->force(*(u_int64_t *)imsg->data);
- scheduler_reset_events();
+ backend->schedule(*(u_int64_t *)imsg->data);
+ scheduler_reset_events();
return;
case IMSG_SCHEDULER_REMOVE:
log_trace(TRACE_SCHEDULER,
"scheduler: IMSG_SCHEDULER_REMOVE: %016"PRIx64,
*(u_int64_t *)imsg->data);
- scheduler_remove(*(u_int64_t *)imsg->data);
+ backend->remove(*(u_int64_t *)imsg->data);
scheduler_reset_events();
return;
-
}
errx(1, "scheduler_imsg: unexpected %s imsg", imsg_to_str(imsg->hdr.type));
}
-void
+static void
scheduler_sig_handler(int sig, short event, void *p)
{
switch (sig) {
@@ -205,25 +208,14 @@ scheduler_sig_handler(int sig, short event, void *p)
}
}
-void
+static void
scheduler_shutdown(void)
{
log_info("scheduler handler exiting");
_exit(0);
}
-void
-scheduler_setup_events(void)
-{
- struct timeval tv;
-
- evtimer_set(&env->sc_ev, scheduler_timeout, NULL);
- tv.tv_sec = 0;
- tv.tv_usec = 10;
- evtimer_add(&env->sc_ev, &tv);
-}
-
-void
+static void
scheduler_reset_events(void)
{
struct timeval tv;
@@ -233,12 +225,6 @@ scheduler_reset_events(void)
evtimer_add(&env->sc_ev, &tv);
}
-void
-scheduler_disable_events(void)
-{
- evtimer_del(&env->sc_ev);
-}
-
pid_t
scheduler(void)
{
@@ -304,200 +290,198 @@ scheduler(void)
config_pipes(peers, nitems(peers));
config_peers(peers, nitems(peers));
- scheduler_setup_events();
+ evtimer_set(&env->sc_ev, scheduler_timeout, NULL);
+ scheduler_reset_events();
event_dispatch();
- scheduler_disable_events();
scheduler_shutdown();
return (0);
}
-void
+static void
scheduler_timeout(int fd, short event, void *p)
{
- time_t nsched;
- time_t curtm;
- u_int64_t evpid;
- static int setup = 0;
- int delay = 0;
- struct timeval tv;
-
- log_trace(TRACE_SCHEDULER, "scheduler: entering scheduler_timeout");
-
- /* if we're not done setting up the scheduler, do it some more */
- if (! setup)
- setup = backend->setup();
-
- /* we don't have a schedulable envelope ... sleep */
- if (! backend->next(&evpid, &nsched))
- goto scheduler_sleep;
-
- /* is the envelope schedulable right away ? */
- curtm = time(NULL);
- if (nsched <= curtm) {
- /* yup */
- scheduler_process_envelope(evpid);
- }
- else {
- /* nope, so we can either keep the timeout delay to 0 if we
- * are not done setting up the scheduler, or sleep until it
- * is time to schedule that envelope otherwise.
- */
- if (setup)
- delay = nsched - curtm;
- }
+ struct timeval tv;
+ struct scheduler_batch batch;
+ int typemask;
- if (delay)
- log_trace(TRACE_SCHEDULER, "scheduler: pausing for %d seconds",
- delay);
- tv.tv_sec = delay;
- tv.tv_usec = 0;
- evtimer_add(&env->sc_ev, &tv);
- return;
+ log_trace(TRACE_SCHEDULER, "scheduler: getting next batch");
-scheduler_sleep:
- log_trace(TRACE_SCHEDULER, "scheduler: sleeping");
- return;
-}
-
-static int
-scheduler_process_envelope(u_int64_t evpid)
-{
- struct envelope envelope;
- size_t mta_av, mda_av, bnc_av;
- struct scheduler_info si;
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
- mta_av = env->sc_maxconn - stat_get(STATS_MTA_SESSION, STAT_ACTIVE);
- mda_av = env->sc_maxconn - stat_get(STATS_MDA_SESSION, STAT_ACTIVE);
- bnc_av = env->sc_maxconn - stat_get(STATS_SCHEDULER_BOUNCES, STAT_ACTIVE);
+ typemask = SCHED_REMOVE | SCHED_EXPIRE | SCHED_BOUNCE;
+ if (!(env->sc_flags & SMTPD_MDA_PAUSED))
+ typemask |= SCHED_MDA;
+ if (!(env->sc_flags & SMTPD_MTA_PAUSED))
+ typemask |= SCHED_MTA;
- if (! queue_envelope_load(evpid, &envelope))
- return 0;
+ backend->batch(typemask, time(NULL), &batch);
+ switch (batch.type) {
+ case SCHED_NONE:
+ log_trace(TRACE_SCHEDULER, "scheduler: sleeping");
+ return;
- if (envelope.type == D_MDA)
- if (mda_av == 0) {
- env->sc_flags |= SMTPD_MDA_BUSY;
- return 0;
- }
+ case SCHED_DELAY:
+ tv.tv_sec = batch.delay;
+ log_trace(TRACE_SCHEDULER,
+ "scheduler: pausing for %li seconds", tv.tv_sec);
+ break;
- if (envelope.type == D_MTA)
- if (mta_av == 0) {
- env->sc_flags |= SMTPD_MTA_BUSY;
- return 0;
- }
+ case SCHED_REMOVE:
+ scheduler_process_remove(&batch);
+ break;
- if (envelope.type == D_BOUNCE)
- if (bnc_av == 0) {
- env->sc_flags |= SMTPD_BOUNCE_BUSY;
- return 0;
- }
+ case SCHED_EXPIRE:
+ scheduler_process_expire(&batch);
+ break;
- if (scheduler_check_loop(&envelope)) {
- struct envelope bounce;
+ case SCHED_BOUNCE:
+ scheduler_process_bounce(&batch);
+ break;
- envelope_set_errormsg(&envelope, "loop has been detected");
- if (bounce_record_message(&envelope, &bounce)) {
- scheduler_info(&si, &bounce);
- backend->insert(&si);
- }
- backend->remove(evpid);
- queue_envelope_delete(&envelope);
+ case SCHED_MDA:
+ scheduler_process_mda(&batch);
+ break;
- scheduler_reset_events();
+ case SCHED_MTA:
+ scheduler_process_mta(&batch);
+ break;
- return 0;
+ default:
+ fatalx("scheduler_timeout: unknown batch type");
}
+ evtimer_add(&env->sc_ev, &tv);
+}
- return scheduler_process_batch(envelope.type, evpid);
+static void
+scheduler_process_remove(struct scheduler_batch *batch)
+{
+ struct envelope evp;
+ struct id_list *e;
+
+ while ((e = batch->evpids)) {
+ batch->evpids = e->next;
+ log_debug("scheduler: deleting evp:%016" PRIx64 " (removed)",
+ e->id);
+ evp.id = e->id;
+ queue_envelope_delete(&evp);
+ free(e);
+ }
}
-static int
-scheduler_process_batch(enum delivery_type type, u_int64_t evpid)
+static void
+scheduler_process_expire(struct scheduler_batch *batch)
{
struct envelope evp;
- void *batch;
- int fd;
-
- batch = backend->batch(evpid);
- switch (type) {
- case D_BOUNCE:
- while (backend->fetch(batch, &evpid)) {
- if (! queue_envelope_load(evpid, &evp))
- goto end;
-
- evp.lasttry = time(NULL);
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_SMTP_ENQUEUE, PROC_SMTP, 0, -1, &evp,
- sizeof evp);
- backend->schedule(evpid);
- }
+ struct id_list *e;
+
+ while ((e = batch->evpids)) {
+ batch->evpids = e->next;
+ log_debug("scheduler: deleting evp:%016" PRIx64 " (expire)",
+ e->id);
+ evp.id = e->id;
+ queue_envelope_delete(&evp);
+ free(e);
+ }
+}
+
+static void
+scheduler_process_bounce(struct scheduler_batch *batch)
+{
+ struct envelope evp;
+ struct id_list *e;
+
+ while ((e = batch->evpids)) {
+ batch->evpids = e->next;
+ log_debug("scheduler: scheduling evp:%016" PRIx64 " (bounce)",
+ e->id);
+ queue_envelope_load(e->id, &evp);
+ evp.lasttry = time(NULL);
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE],
+ IMSG_SMTP_ENQUEUE, PROC_SMTP, 0, -1, &evp,
+ sizeof evp);
stat_increment(STATS_SCHEDULER);
stat_increment(STATS_SCHEDULER_BOUNCES);
- break;
-
- case D_MDA:
- backend->fetch(batch, &evpid);
- if (! queue_envelope_load(evpid, &evp))
- goto end;
-
+ free(e);
+ }
+}
+
+static void
+scheduler_process_mda(struct scheduler_batch *batch)
+{
+ struct envelope evp;
+ struct id_list *e;
+ int fd;
+
+ while ((e = batch->evpids)) {
+ batch->evpids = e->next;
+ log_debug("scheduler: scheduling evp:%016" PRIx64 " (mda)",
+ e->id);
+ queue_envelope_load(e->id, &evp);
evp.lasttry = time(NULL);
- fd = queue_message_fd_r(evpid_to_msgid(evpid));
+ fd = queue_message_fd_r(evpid_to_msgid(evp.id));
imsg_compose_event(env->sc_ievs[PROC_QUEUE],
IMSG_MDA_SESS_NEW, PROC_MDA, 0, fd, &evp,
sizeof evp);
- backend->schedule(evpid);
-
stat_increment(STATS_SCHEDULER);
stat_increment(STATS_MDA_SESSION);
- break;
+ free(e);
+ }
+}
- case D_MTA: {
- struct mta_batch mta_batch;
+static void
+scheduler_process_mta(struct scheduler_batch *batch)
+{
+ struct envelope evp;
+ struct mta_batch mta_batch;
+ struct id_list *e;
- /* FIXME */
- if (! backend->fetch(batch, &evpid))
- goto end;
- if (! queue_envelope_load(evpid, &evp))
- goto end;
+ queue_envelope_load(batch->evpids->id, &evp);
- bzero(&mta_batch, sizeof mta_batch);
- mta_batch.id = arc4random();
- mta_batch.relay = evp.agent.mta.relay;
+ bzero(&mta_batch, sizeof mta_batch);
+ mta_batch.id = arc4random();
+ mta_batch.relay = evp.agent.mta.relay;
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_BATCH_CREATE, PROC_MTA, 0, -1, &mta_batch,
- sizeof mta_batch);
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE],
+ IMSG_BATCH_CREATE, PROC_MTA, 0, -1, &mta_batch,
+ sizeof mta_batch);
- while (backend->fetch(batch, &evpid)) {
- if (! queue_envelope_load(evpid, &evp))
- goto end;
- evp.lasttry = time(NULL); /* FIXME */
- evp.batch_id = mta_batch.id;
+ while ((e = batch->evpids)) {
+ batch->evpids = e->next;
+ log_debug("scheduler: scheduling evp:%016" PRIx64 " (mta)",
+ e->id);
+ queue_envelope_load(e->id, &evp);
+ evp.lasttry = time(NULL);
+ evp.batch_id = mta_batch.id;
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_BATCH_APPEND,
+ PROC_MTA, 0, -1, &evp, sizeof evp);
+ free(e);
+ stat_increment(STATS_SCHEDULER);
+ }
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_BATCH_APPEND, PROC_MTA, 0, -1, &evp,
- sizeof evp);
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_BATCH_CLOSE,
+ PROC_MTA, 0, -1, &mta_batch, sizeof mta_batch);
- backend->schedule(evpid);
- stat_increment(STATS_SCHEDULER);
- }
+ stat_increment(STATS_MTA_SESSION);
+}
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_BATCH_CLOSE, PROC_MTA, 0, -1, &mta_batch,
- sizeof mta_batch);
+void
+scheduler_envelope_update(struct envelope *e)
+{
+ struct scheduler_info si;
- stat_increment(STATS_MTA_SESSION);
- break;
- }
-
- default:
- fatalx("scheduler_process_batchqueue: unknown type");
- }
+ scheduler_info(&si, e);
+ backend->update(&si);
+ scheduler_reset_events();
+}
-end:
- backend->close(batch);
- return 1;
+void
+scheduler_envelope_delete(struct envelope *e)
+{
+ backend->delete(e->id);
+ scheduler_reset_events();
}
static int
@@ -506,7 +490,7 @@ scheduler_load_message(u_int32_t msgid)
struct qwalk *q;
u_int64_t evpid;
struct envelope envelope;
- struct scheduler_info si;
+ struct scheduler_info si;
q = qwalk_new(msgid);
while (qwalk(q, &evpid)) {
@@ -515,98 +499,8 @@ scheduler_load_message(u_int32_t msgid)
scheduler_info(&si, &envelope);
backend->insert(&si);
}
- qwalk_close(q);
+ qwalk_close(q);
+ backend->commit(msgid);
return 1;
}
-
-static int
-scheduler_check_loop(struct envelope *ep)
-{
- int fd;
- FILE *fp;
- char *buf, *lbuf;
- size_t len;
- struct mailaddr maddr;
- int ret = 0;
- int rcvcount = 0;
-
- fd = queue_message_fd_r(evpid_to_msgid(ep->id));
- if ((fp = fdopen(fd, "r")) == NULL)
- fatal("fdopen");
-
- lbuf = NULL;
- while ((buf = fgetln(fp, &len))) {
- if (buf[len - 1] == '\n')
- buf[len - 1] = '\0';
- else {
- /* EOF without EOL, copy and add the NUL */
- if ((lbuf = malloc(len + 1)) == NULL)
- err(1, NULL);
- memcpy(lbuf, buf, len);
- lbuf[len] = '\0';
- buf = lbuf;
- }
-
- if (strchr(buf, ':') == NULL && !isspace((int)*buf))
- break;
-
- if (strncasecmp("Received: ", buf, 10) == 0) {
- rcvcount++;
- if (rcvcount == MAX_HOPS_COUNT) {
- ret = 1;
- break;
- }
- }
-
- else if (strncasecmp("Delivered-To: ", buf, 14) == 0) {
- struct mailaddr dest;
-
- bzero(&maddr, sizeof (struct mailaddr));
- if (! email_to_mailaddr(&maddr, buf + 14))
- continue;
-
- dest = ep->dest;
- if (ep->type == D_BOUNCE)
- dest = ep->sender;
-
- if (strcasecmp(maddr.user, dest.user) == 0 &&
- strcasecmp(maddr.domain, dest.domain) == 0) {
- ret = 1;
- break;
- }
- }
- }
- free(lbuf);
-
- fclose(fp);
- return ret;
-}
-
-static void
-scheduler_remove(u_int64_t id)
-{
- void *msg;
-
- /* removing by evpid */
- if (id > 0xffffffffL) {
- scheduler_remove_envelope(id);
- return;
- }
-
- /* removing by msgid */
- msg = backend->message(id);
- while (backend->fetch(msg, &id))
- scheduler_remove_envelope(id);
- backend->close(msg);
-}
-
-static void
-scheduler_remove_envelope(u_int64_t evpid)
-{
- struct envelope evp;
-
- evp.id = evpid;
- queue_envelope_delete(&evp);
- backend->remove(evpid);
-}