summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Faurot <eric@cvs.openbsd.org>2012-08-08 08:50:43 +0000
committerEric Faurot <eric@cvs.openbsd.org>2012-08-08 08:50:43 +0000
commit4fb538ff7fe692fc3494a906f82178b14bf48bf1 (patch)
tree74c2a8e0bb894841c747e6bee85be58c05869135
parente653821d1952b330839a2b474720109458d1d375 (diff)
Improve the scheduler backend API.
New envelopes are pushed into the scheduler through the insert() commit() rollback() transactional interface functions. Worklists are pulled from the scheduler through a single batch() interface function, which returns a list of envelope ids and the type of processing. Envelopes returned in this batch are said to be "in-flight", as opposed to "pending". They are supposed to be processed in some way, and either updated() or deleted() at some point. The schedule()/remove() functions are used to alter the internal state of "pending" envelopes to make them schedulable. The enve- lopes will be part of a worklist on the next call to batch(). Rewrite the scheduler_ramqueue backend. The initial queue loading in now done by the queue. ok gilles@
-rw-r--r--usr.sbin/smtpd/bounce.c8
-rw-r--r--usr.sbin/smtpd/queue.c58
-rw-r--r--usr.sbin/smtpd/scheduler.c454
-rw-r--r--usr.sbin/smtpd/scheduler_backend.c23
-rw-r--r--usr.sbin/smtpd/scheduler_ramqueue.c1199
-rw-r--r--usr.sbin/smtpd/smtpd.h55
-rw-r--r--usr.sbin/smtpd/util.c47
7 files changed, 896 insertions, 948 deletions
diff --git a/usr.sbin/smtpd/bounce.c b/usr.sbin/smtpd/bounce.c
index c2147d12297..f31b322ef29 100644
--- a/usr.sbin/smtpd/bounce.c
+++ b/usr.sbin/smtpd/bounce.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: bounce.c,v 1.42 2012/07/09 09:57:53 gilles Exp $ */
+/* $OpenBSD: bounce.c,v 1.43 2012/08/08 08:50:42 eric Exp $ */
/*
* Copyright (c) 2009 Gilles Chehade <gilles@openbsd.org>
@@ -60,6 +60,10 @@ struct bounce {
struct io io;
};
+/* XXX remove later */
+void scheduler_envelope_update(struct envelope *);
+void scheduler_envelope_delete(struct envelope *);
+
static void bounce_send(struct bounce *, const char *, ...);
static int bounce_next(struct bounce *);
static void bounce_status(struct bounce *, const char *, ...);
@@ -210,10 +214,12 @@ bounce_status(struct bounce *bounce, const char *fmt, ...)
log_debug("#### %s: queue_envelope_delete: %016" PRIx64,
__func__, bounce->evp.id);
queue_envelope_delete(&bounce->evp);
+ scheduler_envelope_delete(&bounce->evp);
} else {
bounce->evp.retry++;
envelope_set_errormsg(&bounce->evp, "%s", status);
queue_envelope_update(&bounce->evp);
+ scheduler_envelope_update(&bounce->evp);
}
bounce->evp.id = 0;
free(status);
diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c
index d0e9549607b..c8c2d31403c 100644
--- a/usr.sbin/smtpd/queue.c
+++ b/usr.sbin/smtpd/queue.c
@@ -1,8 +1,9 @@
-/* $OpenBSD: queue.c,v 1.121 2012/07/09 09:57:53 gilles Exp $ */
+/* $OpenBSD: queue.c,v 1.122 2012/08/08 08:50:42 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
* Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
+ * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
@@ -39,6 +40,7 @@
#include "log.h"
static void queue_imsg(struct imsgev *, struct imsg *);
+static void queue_timeout(int, short, void *);
static void queue_pass_to_scheduler(struct imsgev *, struct imsg *);
static void queue_shutdown(void);
static void queue_sig_handler(int, short, void *);
@@ -224,7 +226,8 @@ queue(void)
{
pid_t pid;
struct passwd *pw;
-
+ struct timeval tv;
+ struct event ev_qload;
struct event ev_sigint;
struct event ev_sigterm;
@@ -286,6 +289,12 @@ queue(void)
config_pipes(peers, nitems(peers));
config_peers(peers, nitems(peers));
+ /* setup queue loading task */
+ evtimer_set(&ev_qload, queue_timeout, &ev_qload);
+ tv.tv_sec = 0;
+ tv.tv_usec = 10;
+ evtimer_add(&ev_qload, &tv);
+
if (event_dispatch() < 0)
fatal("event_dispatch");
queue_shutdown();
@@ -294,6 +303,51 @@ queue(void)
}
void
+queue_timeout(int fd, short event, void *p)
+{
+ static struct qwalk *q = NULL;
+ struct event *ev = p;
+ static uint64_t last_evpid = 0;
+ struct envelope envelope;
+ struct timeval tv;
+ uint64_t evpid;
+
+ if (q == NULL) {
+ log_info("queue: loading queue into scheduler");
+ q = qwalk_new(0);
+ }
+
+ while (qwalk(q, &evpid)) {
+ if (! queue_envelope_load(evpid, &envelope))
+ continue;
+
+ if (evpid_to_msgid(evpid) != evpid_to_msgid(last_evpid) &&
+ last_evpid != 0) {
+ envelope.id = last_evpid;
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &envelope,
+ sizeof envelope);
+ }
+
+ last_evpid = evpid;
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+ evtimer_add(ev, &tv);
+ return;
+ }
+
+ if (last_evpid) {
+ envelope.id = last_evpid;
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &envelope,
+ sizeof envelope);
+ }
+
+ log_info("queue: done loading queue into scheduler");
+ qwalk_close(q);
+}
+
+void
queue_submit_envelope(struct envelope *ep)
{
imsg_compose_event(env->sc_ievs[PROC_QUEUE],
diff --git a/usr.sbin/smtpd/scheduler.c b/usr.sbin/smtpd/scheduler.c
index 490e4db546e..9efaf8029a4 100644
--- a/usr.sbin/smtpd/scheduler.c
+++ b/usr.sbin/smtpd/scheduler.c
@@ -1,9 +1,10 @@
-/* $OpenBSD: scheduler.c,v 1.7 2012/07/18 22:04:49 eric Exp $ */
+/* $OpenBSD: scheduler.c,v 1.8 2012/08/08 08:50:42 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
* Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
* Copyright (c) 2008-2009 Jacek Masiulaniec <jacekm@dobremiasto.net>
+ * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
@@ -47,17 +48,18 @@
static void scheduler_imsg(struct imsgev *, struct imsg *);
static void scheduler_shutdown(void);
static void scheduler_sig_handler(int, short, void *);
-static void scheduler_setup_events(void);
static void scheduler_reset_events(void);
-static void scheduler_disable_events(void);
static void scheduler_timeout(int, short, void *);
-static void scheduler_remove(u_int64_t);
-static void scheduler_remove_envelope(u_int64_t);
-static int scheduler_process_envelope(u_int64_t);
-static int scheduler_process_batch(enum delivery_type, u_int64_t);
-static int scheduler_check_loop(struct envelope *);
+static void scheduler_process_remove(struct scheduler_batch *);
+static void scheduler_process_expire(struct scheduler_batch *);
+static void scheduler_process_bounce(struct scheduler_batch *);
+static void scheduler_process_mda(struct scheduler_batch *);
+static void scheduler_process_mta(struct scheduler_batch *);
static int scheduler_load_message(u_int32_t);
+void scheduler_envelope_update(struct envelope *);
+void scheduler_envelope_delete(struct envelope *);
+
static struct scheduler_backend *backend = NULL;
extern const char *backend_scheduler;
@@ -84,8 +86,9 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg)
e = imsg->data;
log_trace(TRACE_SCHEDULER,
"scheduler: IMSG_QUEUE_DELIVERY_OK: %016"PRIx64, e->id);
- backend->remove(e->id);
+ backend->delete(e->id);
queue_envelope_delete(e);
+ scheduler_reset_events();
return;
case IMSG_QUEUE_DELIVERY_TEMPFAIL:
@@ -96,7 +99,7 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg)
e->retry++;
queue_envelope_update(e);
scheduler_info(&si, e);
- backend->insert(&si);
+ backend->update(&si);
scheduler_reset_events();
return;
@@ -109,10 +112,11 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg)
bounce_record_message(e, &bounce);
scheduler_info(&si, &bounce);
backend->insert(&si);
- scheduler_reset_events();
+ backend->commit(evpid_to_msgid(bounce.id));
}
- backend->remove(e->id);
+ backend->delete(e->id);
queue_envelope_delete(e);
+ scheduler_reset_events();
return;
case IMSG_MDA_SESS_NEW:
@@ -138,7 +142,7 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg)
if (imsg->fd < 0 || !bounce_session(imsg->fd, e)) {
queue_envelope_update(e);
scheduler_info(&si, e);
- backend->insert(&si);
+ backend->update(&si);
scheduler_reset_events();
return;
}
@@ -175,24 +179,23 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg)
log_trace(TRACE_SCHEDULER,
"scheduler: IMSG_SCHEDULER_SCHEDULE: %016"PRIx64,
*(u_int64_t *)imsg->data);
- backend->force(*(u_int64_t *)imsg->data);
- scheduler_reset_events();
+ backend->schedule(*(u_int64_t *)imsg->data);
+ scheduler_reset_events();
return;
case IMSG_SCHEDULER_REMOVE:
log_trace(TRACE_SCHEDULER,
"scheduler: IMSG_SCHEDULER_REMOVE: %016"PRIx64,
*(u_int64_t *)imsg->data);
- scheduler_remove(*(u_int64_t *)imsg->data);
+ backend->remove(*(u_int64_t *)imsg->data);
scheduler_reset_events();
return;
-
}
errx(1, "scheduler_imsg: unexpected %s imsg", imsg_to_str(imsg->hdr.type));
}
-void
+static void
scheduler_sig_handler(int sig, short event, void *p)
{
switch (sig) {
@@ -205,25 +208,14 @@ scheduler_sig_handler(int sig, short event, void *p)
}
}
-void
+static void
scheduler_shutdown(void)
{
log_info("scheduler handler exiting");
_exit(0);
}
-void
-scheduler_setup_events(void)
-{
- struct timeval tv;
-
- evtimer_set(&env->sc_ev, scheduler_timeout, NULL);
- tv.tv_sec = 0;
- tv.tv_usec = 10;
- evtimer_add(&env->sc_ev, &tv);
-}
-
-void
+static void
scheduler_reset_events(void)
{
struct timeval tv;
@@ -233,12 +225,6 @@ scheduler_reset_events(void)
evtimer_add(&env->sc_ev, &tv);
}
-void
-scheduler_disable_events(void)
-{
- evtimer_del(&env->sc_ev);
-}
-
pid_t
scheduler(void)
{
@@ -304,200 +290,198 @@ scheduler(void)
config_pipes(peers, nitems(peers));
config_peers(peers, nitems(peers));
- scheduler_setup_events();
+ evtimer_set(&env->sc_ev, scheduler_timeout, NULL);
+ scheduler_reset_events();
event_dispatch();
- scheduler_disable_events();
scheduler_shutdown();
return (0);
}
-void
+static void
scheduler_timeout(int fd, short event, void *p)
{
- time_t nsched;
- time_t curtm;
- u_int64_t evpid;
- static int setup = 0;
- int delay = 0;
- struct timeval tv;
-
- log_trace(TRACE_SCHEDULER, "scheduler: entering scheduler_timeout");
-
- /* if we're not done setting up the scheduler, do it some more */
- if (! setup)
- setup = backend->setup();
-
- /* we don't have a schedulable envelope ... sleep */
- if (! backend->next(&evpid, &nsched))
- goto scheduler_sleep;
-
- /* is the envelope schedulable right away ? */
- curtm = time(NULL);
- if (nsched <= curtm) {
- /* yup */
- scheduler_process_envelope(evpid);
- }
- else {
- /* nope, so we can either keep the timeout delay to 0 if we
- * are not done setting up the scheduler, or sleep until it
- * is time to schedule that envelope otherwise.
- */
- if (setup)
- delay = nsched - curtm;
- }
+ struct timeval tv;
+ struct scheduler_batch batch;
+ int typemask;
- if (delay)
- log_trace(TRACE_SCHEDULER, "scheduler: pausing for %d seconds",
- delay);
- tv.tv_sec = delay;
- tv.tv_usec = 0;
- evtimer_add(&env->sc_ev, &tv);
- return;
+ log_trace(TRACE_SCHEDULER, "scheduler: getting next batch");
-scheduler_sleep:
- log_trace(TRACE_SCHEDULER, "scheduler: sleeping");
- return;
-}
-
-static int
-scheduler_process_envelope(u_int64_t evpid)
-{
- struct envelope envelope;
- size_t mta_av, mda_av, bnc_av;
- struct scheduler_info si;
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
- mta_av = env->sc_maxconn - stat_get(STATS_MTA_SESSION, STAT_ACTIVE);
- mda_av = env->sc_maxconn - stat_get(STATS_MDA_SESSION, STAT_ACTIVE);
- bnc_av = env->sc_maxconn - stat_get(STATS_SCHEDULER_BOUNCES, STAT_ACTIVE);
+ typemask = SCHED_REMOVE | SCHED_EXPIRE | SCHED_BOUNCE;
+ if (!(env->sc_flags & SMTPD_MDA_PAUSED))
+ typemask |= SCHED_MDA;
+ if (!(env->sc_flags & SMTPD_MTA_PAUSED))
+ typemask |= SCHED_MTA;
- if (! queue_envelope_load(evpid, &envelope))
- return 0;
+ backend->batch(typemask, time(NULL), &batch);
+ switch (batch.type) {
+ case SCHED_NONE:
+ log_trace(TRACE_SCHEDULER, "scheduler: sleeping");
+ return;
- if (envelope.type == D_MDA)
- if (mda_av == 0) {
- env->sc_flags |= SMTPD_MDA_BUSY;
- return 0;
- }
+ case SCHED_DELAY:
+ tv.tv_sec = batch.delay;
+ log_trace(TRACE_SCHEDULER,
+ "scheduler: pausing for %li seconds", tv.tv_sec);
+ break;
- if (envelope.type == D_MTA)
- if (mta_av == 0) {
- env->sc_flags |= SMTPD_MTA_BUSY;
- return 0;
- }
+ case SCHED_REMOVE:
+ scheduler_process_remove(&batch);
+ break;
- if (envelope.type == D_BOUNCE)
- if (bnc_av == 0) {
- env->sc_flags |= SMTPD_BOUNCE_BUSY;
- return 0;
- }
+ case SCHED_EXPIRE:
+ scheduler_process_expire(&batch);
+ break;
- if (scheduler_check_loop(&envelope)) {
- struct envelope bounce;
+ case SCHED_BOUNCE:
+ scheduler_process_bounce(&batch);
+ break;
- envelope_set_errormsg(&envelope, "loop has been detected");
- if (bounce_record_message(&envelope, &bounce)) {
- scheduler_info(&si, &bounce);
- backend->insert(&si);
- }
- backend->remove(evpid);
- queue_envelope_delete(&envelope);
+ case SCHED_MDA:
+ scheduler_process_mda(&batch);
+ break;
- scheduler_reset_events();
+ case SCHED_MTA:
+ scheduler_process_mta(&batch);
+ break;
- return 0;
+ default:
+ fatalx("scheduler_timeout: unknown batch type");
}
+ evtimer_add(&env->sc_ev, &tv);
+}
- return scheduler_process_batch(envelope.type, evpid);
+static void
+scheduler_process_remove(struct scheduler_batch *batch)
+{
+ struct envelope evp;
+ struct id_list *e;
+
+ while ((e = batch->evpids)) {
+ batch->evpids = e->next;
+ log_debug("scheduler: deleting evp:%016" PRIx64 " (removed)",
+ e->id);
+ evp.id = e->id;
+ queue_envelope_delete(&evp);
+ free(e);
+ }
}
-static int
-scheduler_process_batch(enum delivery_type type, u_int64_t evpid)
+static void
+scheduler_process_expire(struct scheduler_batch *batch)
{
struct envelope evp;
- void *batch;
- int fd;
-
- batch = backend->batch(evpid);
- switch (type) {
- case D_BOUNCE:
- while (backend->fetch(batch, &evpid)) {
- if (! queue_envelope_load(evpid, &evp))
- goto end;
-
- evp.lasttry = time(NULL);
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_SMTP_ENQUEUE, PROC_SMTP, 0, -1, &evp,
- sizeof evp);
- backend->schedule(evpid);
- }
+ struct id_list *e;
+
+ while ((e = batch->evpids)) {
+ batch->evpids = e->next;
+ log_debug("scheduler: deleting evp:%016" PRIx64 " (expire)",
+ e->id);
+ evp.id = e->id;
+ queue_envelope_delete(&evp);
+ free(e);
+ }
+}
+
+static void
+scheduler_process_bounce(struct scheduler_batch *batch)
+{
+ struct envelope evp;
+ struct id_list *e;
+
+ while ((e = batch->evpids)) {
+ batch->evpids = e->next;
+ log_debug("scheduler: scheduling evp:%016" PRIx64 " (bounce)",
+ e->id);
+ queue_envelope_load(e->id, &evp);
+ evp.lasttry = time(NULL);
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE],
+ IMSG_SMTP_ENQUEUE, PROC_SMTP, 0, -1, &evp,
+ sizeof evp);
stat_increment(STATS_SCHEDULER);
stat_increment(STATS_SCHEDULER_BOUNCES);
- break;
-
- case D_MDA:
- backend->fetch(batch, &evpid);
- if (! queue_envelope_load(evpid, &evp))
- goto end;
-
+ free(e);
+ }
+}
+
+static void
+scheduler_process_mda(struct scheduler_batch *batch)
+{
+ struct envelope evp;
+ struct id_list *e;
+ int fd;
+
+ while ((e = batch->evpids)) {
+ batch->evpids = e->next;
+ log_debug("scheduler: scheduling evp:%016" PRIx64 " (mda)",
+ e->id);
+ queue_envelope_load(e->id, &evp);
evp.lasttry = time(NULL);
- fd = queue_message_fd_r(evpid_to_msgid(evpid));
+ fd = queue_message_fd_r(evpid_to_msgid(evp.id));
imsg_compose_event(env->sc_ievs[PROC_QUEUE],
IMSG_MDA_SESS_NEW, PROC_MDA, 0, fd, &evp,
sizeof evp);
- backend->schedule(evpid);
-
stat_increment(STATS_SCHEDULER);
stat_increment(STATS_MDA_SESSION);
- break;
+ free(e);
+ }
+}
- case D_MTA: {
- struct mta_batch mta_batch;
+static void
+scheduler_process_mta(struct scheduler_batch *batch)
+{
+ struct envelope evp;
+ struct mta_batch mta_batch;
+ struct id_list *e;
- /* FIXME */
- if (! backend->fetch(batch, &evpid))
- goto end;
- if (! queue_envelope_load(evpid, &evp))
- goto end;
+ queue_envelope_load(batch->evpids->id, &evp);
- bzero(&mta_batch, sizeof mta_batch);
- mta_batch.id = arc4random();
- mta_batch.relay = evp.agent.mta.relay;
+ bzero(&mta_batch, sizeof mta_batch);
+ mta_batch.id = arc4random();
+ mta_batch.relay = evp.agent.mta.relay;
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_BATCH_CREATE, PROC_MTA, 0, -1, &mta_batch,
- sizeof mta_batch);
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE],
+ IMSG_BATCH_CREATE, PROC_MTA, 0, -1, &mta_batch,
+ sizeof mta_batch);
- while (backend->fetch(batch, &evpid)) {
- if (! queue_envelope_load(evpid, &evp))
- goto end;
- evp.lasttry = time(NULL); /* FIXME */
- evp.batch_id = mta_batch.id;
+ while ((e = batch->evpids)) {
+ batch->evpids = e->next;
+ log_debug("scheduler: scheduling evp:%016" PRIx64 " (mta)",
+ e->id);
+ queue_envelope_load(e->id, &evp);
+ evp.lasttry = time(NULL);
+ evp.batch_id = mta_batch.id;
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_BATCH_APPEND,
+ PROC_MTA, 0, -1, &evp, sizeof evp);
+ free(e);
+ stat_increment(STATS_SCHEDULER);
+ }
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_BATCH_APPEND, PROC_MTA, 0, -1, &evp,
- sizeof evp);
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_BATCH_CLOSE,
+ PROC_MTA, 0, -1, &mta_batch, sizeof mta_batch);
- backend->schedule(evpid);
- stat_increment(STATS_SCHEDULER);
- }
+ stat_increment(STATS_MTA_SESSION);
+}
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_BATCH_CLOSE, PROC_MTA, 0, -1, &mta_batch,
- sizeof mta_batch);
+void
+scheduler_envelope_update(struct envelope *e)
+{
+ struct scheduler_info si;
- stat_increment(STATS_MTA_SESSION);
- break;
- }
-
- default:
- fatalx("scheduler_process_batchqueue: unknown type");
- }
+ scheduler_info(&si, e);
+ backend->update(&si);
+ scheduler_reset_events();
+}
-end:
- backend->close(batch);
- return 1;
+void
+scheduler_envelope_delete(struct envelope *e)
+{
+ backend->delete(e->id);
+ scheduler_reset_events();
}
static int
@@ -506,7 +490,7 @@ scheduler_load_message(u_int32_t msgid)
struct qwalk *q;
u_int64_t evpid;
struct envelope envelope;
- struct scheduler_info si;
+ struct scheduler_info si;
q = qwalk_new(msgid);
while (qwalk(q, &evpid)) {
@@ -515,98 +499,8 @@ scheduler_load_message(u_int32_t msgid)
scheduler_info(&si, &envelope);
backend->insert(&si);
}
- qwalk_close(q);
+ qwalk_close(q);
+ backend->commit(msgid);
return 1;
}
-
-static int
-scheduler_check_loop(struct envelope *ep)
-{
- int fd;
- FILE *fp;
- char *buf, *lbuf;
- size_t len;
- struct mailaddr maddr;
- int ret = 0;
- int rcvcount = 0;
-
- fd = queue_message_fd_r(evpid_to_msgid(ep->id));
- if ((fp = fdopen(fd, "r")) == NULL)
- fatal("fdopen");
-
- lbuf = NULL;
- while ((buf = fgetln(fp, &len))) {
- if (buf[len - 1] == '\n')
- buf[len - 1] = '\0';
- else {
- /* EOF without EOL, copy and add the NUL */
- if ((lbuf = malloc(len + 1)) == NULL)
- err(1, NULL);
- memcpy(lbuf, buf, len);
- lbuf[len] = '\0';
- buf = lbuf;
- }
-
- if (strchr(buf, ':') == NULL && !isspace((int)*buf))
- break;
-
- if (strncasecmp("Received: ", buf, 10) == 0) {
- rcvcount++;
- if (rcvcount == MAX_HOPS_COUNT) {
- ret = 1;
- break;
- }
- }
-
- else if (strncasecmp("Delivered-To: ", buf, 14) == 0) {
- struct mailaddr dest;
-
- bzero(&maddr, sizeof (struct mailaddr));
- if (! email_to_mailaddr(&maddr, buf + 14))
- continue;
-
- dest = ep->dest;
- if (ep->type == D_BOUNCE)
- dest = ep->sender;
-
- if (strcasecmp(maddr.user, dest.user) == 0 &&
- strcasecmp(maddr.domain, dest.domain) == 0) {
- ret = 1;
- break;
- }
- }
- }
- free(lbuf);
-
- fclose(fp);
- return ret;
-}
-
-static void
-scheduler_remove(u_int64_t id)
-{
- void *msg;
-
- /* removing by evpid */
- if (id > 0xffffffffL) {
- scheduler_remove_envelope(id);
- return;
- }
-
- /* removing by msgid */
- msg = backend->message(id);
- while (backend->fetch(msg, &id))
- scheduler_remove_envelope(id);
- backend->close(msg);
-}
-
-static void
-scheduler_remove_envelope(u_int64_t evpid)
-{
- struct envelope evp;
-
- evp.id = evpid;
- queue_envelope_delete(&evp);
- backend->remove(evpid);
-}
diff --git a/usr.sbin/smtpd/scheduler_backend.c b/usr.sbin/smtpd/scheduler_backend.c
index e22411fb7d9..7dafbf488b4 100644
--- a/usr.sbin/smtpd/scheduler_backend.c
+++ b/usr.sbin/smtpd/scheduler_backend.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: scheduler_backend.c,v 1.2 2012/07/28 12:06:41 chl Exp $ */
+/* $OpenBSD: scheduler_backend.c,v 1.3 2012/08/08 08:50:42 eric Exp $ */
/*
* Copyright (c) 2012 Gilles Chehade <gilles@openbsd.org>
@@ -57,3 +57,24 @@ scheduler_info(struct scheduler_info *sched, struct envelope *evp)
sched->expire = evp->expire;
sched->retry = evp->retry;
}
+
+time_t
+scheduler_compute_schedule(struct scheduler_info *sched)
+{
+ time_t delay;
+
+ if (sched->retry == 0)
+ delay = 0;
+#if 0
+ /* for testing scheduler sleep */
+ delay == arc4random() % 30;
+#endif
+ else if (sched->retry < 4)
+ delay = (sched->retry * 15 * 60);
+ else if (sched->retry < 8)
+ delay = ((sched->retry - 3) * 60 * 60);
+ else
+ delay = ((sched->retry - 7) * 24 * 60 * 60);
+
+ return (sched->creation + delay);
+}
diff --git a/usr.sbin/smtpd/scheduler_ramqueue.c b/usr.sbin/smtpd/scheduler_ramqueue.c
index a2ab2f1254d..07717fcd616 100644
--- a/usr.sbin/smtpd/scheduler_ramqueue.c
+++ b/usr.sbin/smtpd/scheduler_ramqueue.c
@@ -1,7 +1,8 @@
-/* $OpenBSD: scheduler_ramqueue.c,v 1.11 2012/07/10 11:13:40 gilles Exp $ */
+/* $OpenBSD: scheduler_ramqueue.c,v 1.12 2012/08/08 08:50:42 eric Exp $ */
/*
* Copyright (c) 2012 Gilles Chehade <gilles@openbsd.org>
+ * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
@@ -36,787 +37,703 @@
#include "smtpd.h"
#include "log.h"
+SPLAY_HEAD(hosttree, rq_host);
+TAILQ_HEAD(evplist, rq_envelope);
-struct ramqueue_host {
- RB_ENTRY(ramqueue_host) hosttree_entry;
- TAILQ_HEAD(,ramqueue_batch) batch_queue;
- char hostname[MAXHOSTNAMELEN];
+struct rq_host {
+ SPLAY_ENTRY(rq_host) hosttree_entry;
+ char host[MAXHOSTNAMELEN];
+ struct tree batches;
};
-struct ramqueue_batch {
- enum delivery_type type;
- TAILQ_ENTRY(ramqueue_batch) batch_entry;
- TAILQ_HEAD(,ramqueue_envelope) envelope_queue;
- struct ramqueue_host *rq_host;
- u_int64_t b_id;
- u_int32_t msgid;
- u_int32_t evpcnt;
-};
-struct ramqueue_envelope {
- TAILQ_ENTRY(ramqueue_envelope) queue_entry;
- TAILQ_ENTRY(ramqueue_envelope) batchqueue_entry;
- RB_ENTRY(ramqueue_envelope) evptree_entry;
- struct ramqueue_batch *rq_batch;
- struct ramqueue_message *rq_msg;
- struct ramqueue_host *rq_host;
- u_int64_t evpid;
- time_t sched;
-};
-struct ramqueue_message {
- RB_ENTRY(ramqueue_message) msgtree_entry;
- RB_HEAD(evptree, ramqueue_envelope) evptree;
- u_int32_t msgid;
- u_int32_t evpcnt;
+
+struct rq_message {
+ uint32_t msgid;
+ struct tree envelopes;
};
-struct ramqueue {
- RB_HEAD(hosttree, ramqueue_host) hosttree;
- RB_HEAD(msgtree, ramqueue_message) msgtree;
- RB_HEAD(offloadtree, ramqueue_envelope) offloadtree;
- TAILQ_HEAD(,ramqueue_envelope) queue;
+
+struct rq_batch {
+ uint32_t msgid;
+ struct tree envelopes;
+ struct rq_host *host;
};
-RB_PROTOTYPE(hosttree, ramqueue_host, hosttree_entry, ramqueue_host_cmp);
-RB_PROTOTYPE(msgtree, ramqueue_message, msg_entry, ramqueue_msg_cmp);
-RB_PROTOTYPE(evptree, ramqueue_envelope, evp_entry, ramqueue_evp_cmp);
-RB_PROTOTYPE(offloadtree, ramqueue_envelope, evp_entry, ramqueue_evp_cmp);
+struct rq_envelope {
+ TAILQ_ENTRY(rq_envelope) entry;
+
+ uint64_t evpid;
+ int type;
+
+#define RQ_ENVELOPE_INFLIGHT 0x01
+#define RQ_ENVELOPE_EXPIRED 0x02
+ uint8_t flags;
-enum ramqueue_iter_type {
- RAMQUEUE_ITER_HOST,
- RAMQUEUE_ITER_BATCH,
- RAMQUEUE_ITER_MESSAGE,
- RAMQUEUE_ITER_QUEUE
+ time_t sched;
+ time_t expire;
+
+ struct rq_message *message;
+ struct rq_batch *batch;
+ struct evplist *queue;
};
-struct ramqueue_iter {
- enum ramqueue_iter_type type;
- union {
- struct ramqueue_host *host;
- struct ramqueue_batch *batch;
- struct ramqueue_message *message;
- } u;
+struct rq_queue {
+ struct hosttree hosts;
+ struct tree messages;
+ struct evplist mda;
+ struct evplist mta;
+ struct evplist bounce;
+ struct evplist inflight;
+ struct tree expired;
+ struct tree removed;
};
+static void scheduler_ramqueue_init(void);
+static void scheduler_ramqueue_insert(struct scheduler_info *);
+static void scheduler_ramqueue_commit(uint32_t);
+static void scheduler_ramqueue_rollback(uint32_t);
+static void scheduler_ramqueue_update(struct scheduler_info *);
+static void scheduler_ramqueue_delete(uint64_t);
+static void scheduler_ramqueue_batch(int, time_t, struct scheduler_batch *);
+static void scheduler_ramqueue_schedule(uint64_t);
+static void scheduler_ramqueue_remove(uint64_t);
+
+static int scheduler_ramqueue_next(int, uint64_t *, time_t *);
+static void sorted_insert(struct evplist *, struct rq_envelope *);
+static void sorted_merge(struct evplist *, struct evplist *);
+
-static int ramqueue_host_cmp(struct ramqueue_host *, struct ramqueue_host *);
-static int ramqueue_msg_cmp(struct ramqueue_message *, struct ramqueue_message *);
-static int ramqueue_evp_cmp(struct ramqueue_envelope *, struct ramqueue_envelope *);
-static struct ramqueue_host *ramqueue_lookup_host(char *);
-static struct ramqueue_host *ramqueue_insert_host(char *);
-static void ramqueue_remove_host(struct ramqueue_host *);
-static struct ramqueue_batch *ramqueue_lookup_batch(struct ramqueue_host *,
- u_int32_t);
-static struct ramqueue_batch *ramqueue_insert_batch(struct ramqueue_host *,
- u_int32_t);
-static void ramqueue_remove_batch(struct ramqueue_host *, struct ramqueue_batch *);
-static struct ramqueue_message *ramqueue_lookup_message(u_int32_t);
-static struct ramqueue_message *ramqueue_insert_message(u_int32_t);
-static void ramqueue_remove_message(struct ramqueue_message *);
-
-static struct ramqueue_envelope *ramqueue_lookup_envelope(u_int64_t);
-static struct ramqueue_envelope *ramqueue_lookup_offload(u_int64_t);
-
-
-/*NEEDSFIX*/
-static int ramqueue_expire(struct envelope *);
-static time_t ramqueue_next_schedule(struct scheduler_info *, time_t);
-
-static void scheduler_ramqueue_init(void);
-static int scheduler_ramqueue_setup(void);
-static int scheduler_ramqueue_next(u_int64_t *, time_t *);
-static void scheduler_ramqueue_insert(struct scheduler_info *);
-static void scheduler_ramqueue_schedule(u_int64_t);
-static void scheduler_ramqueue_remove(u_int64_t);
-static void *scheduler_ramqueue_host(char *);
-static void *scheduler_ramqueue_message(u_int32_t);
-static void *scheduler_ramqueue_batch(u_int64_t);
-static void *scheduler_ramqueue_queue(void);
-static void scheduler_ramqueue_close(void *);
-static int scheduler_ramqueue_fetch(void *, u_int64_t *);
-static int scheduler_ramqueue_force(u_int64_t);
-static void scheduler_ramqueue_display(void);
+static void rq_queue_init(struct rq_queue *);
+static void rq_queue_merge(struct rq_queue *, struct rq_queue *);
+static void rq_queue_dump(struct rq_queue *, const char *, time_t);
+static void rq_envelope_delete(struct rq_queue *, struct rq_envelope *);
+static const char *rq_envelope_to_text(struct rq_envelope *, time_t);
+static struct rq_host *rq_host_lookup(struct hosttree *, char *);
+static struct rq_host *rq_host_create(struct hosttree *, char *);
+static int rq_host_cmp(struct rq_host *, struct rq_host *);
+
+SPLAY_PROTOTYPE(hosttree, rq_host, hosttree_entry, rq_host_cmp);
struct scheduler_backend scheduler_backend_ramqueue = {
scheduler_ramqueue_init,
- scheduler_ramqueue_setup,
- scheduler_ramqueue_next,
- scheduler_ramqueue_insert,
- scheduler_ramqueue_schedule,
- scheduler_ramqueue_remove,
- scheduler_ramqueue_host,
- scheduler_ramqueue_message,
- scheduler_ramqueue_batch,
- scheduler_ramqueue_queue,
- scheduler_ramqueue_close,
- scheduler_ramqueue_fetch,
- scheduler_ramqueue_force,
- scheduler_ramqueue_display
-};
-static struct ramqueue ramqueue;
-static void
-scheduler_ramqueue_display_hosttree(void)
-{
- struct ramqueue_host *rq_host;
- struct ramqueue_batch *rq_batch;
- struct ramqueue_envelope *rq_evp;
-
- log_debug("\tscheduler_ramqueue: hosttree display");
- RB_FOREACH(rq_host, hosttree, &ramqueue.hosttree) {
- log_debug("\t\thost: [%p] %s", rq_host, rq_host->hostname);
- TAILQ_FOREACH(rq_batch, &rq_host->batch_queue, batch_entry) {
- log_debug("\t\t\tbatch: [%p] %016x",
- rq_batch, rq_batch->msgid);
- TAILQ_FOREACH(rq_evp, &rq_batch->envelope_queue,
- batchqueue_entry) {
- log_debug("\t\t\t\tevpid: [%p] %016"PRIx64,
- rq_evp, rq_evp->evpid);
- }
- }
- }
-}
+ scheduler_ramqueue_insert,
+ scheduler_ramqueue_commit,
+ scheduler_ramqueue_rollback,
-static void
-scheduler_ramqueue_display_msgtree(void)
-{
- struct ramqueue_message *rq_msg;
- struct ramqueue_envelope *rq_evp;
-
- log_debug("\tscheduler_ramqueue: msgtree display");
- RB_FOREACH(rq_msg, msgtree, &ramqueue.msgtree) {
- log_debug("\t\tmsg: [%p] %016x", rq_msg, rq_msg->msgid);
- RB_FOREACH(rq_evp, evptree, &rq_msg->evptree) {
- log_debug("\t\t\tevp: [%p] %016"PRIx64,
- rq_evp, rq_evp->evpid);
- }
- }
-}
+ scheduler_ramqueue_update,
+ scheduler_ramqueue_delete,
-static void
-scheduler_ramqueue_display_offloadtree(void)
-{
- struct ramqueue_envelope *rq_evp;
+ scheduler_ramqueue_batch,
- log_debug("\tscheduler_ramqueue: offloadtree display");
- RB_FOREACH(rq_evp, offloadtree, &ramqueue.offloadtree) {
- log_debug("\t\t\tevp: [%p] %016"PRIx64,
- rq_evp, rq_evp->evpid);
- }
-}
+ scheduler_ramqueue_schedule,
+ scheduler_ramqueue_remove,
+};
-static void
-scheduler_ramqueue_display_queue(void)
-{
- struct ramqueue_envelope *rq_evp;
+static struct rq_queue ramqueue;
+static struct tree updates;
- log_debug("\tscheduler_ramqueue: queue display");
- TAILQ_FOREACH(rq_evp, &ramqueue.queue, queue_entry) {
- log_debug("\t\tevpid: [%p] [batch: %p], %016"PRIx64,
- rq_evp, rq_evp->rq_batch, rq_evp->evpid);
- }
-}
-
-static void
-scheduler_ramqueue_display(void)
-{
- log_debug("scheduler_ramqueue: display");
- scheduler_ramqueue_display_hosttree();
- scheduler_ramqueue_display_msgtree();
- scheduler_ramqueue_display_offloadtree();
- scheduler_ramqueue_display_queue();
-}
+extern int verbose;
static void
scheduler_ramqueue_init(void)
{
- log_debug("scheduler_ramqueue: init");
- bzero(&ramqueue, sizeof (ramqueue));
- TAILQ_INIT(&ramqueue.queue);
- RB_INIT(&ramqueue.hosttree);
- RB_INIT(&ramqueue.msgtree);
- RB_INIT(&ramqueue.offloadtree);
+ rq_queue_init(&ramqueue);
+ tree_init(&updates);
}
-static int
-scheduler_ramqueue_setup(void)
+static void
+scheduler_ramqueue_insert(struct scheduler_info *si)
{
- struct envelope envelope;
- static struct qwalk *q = NULL;
- u_int64_t evpid;
- struct scheduler_info si;
-
- log_debug("scheduler_ramqueue: load");
-
- log_info("scheduler_ramqueue: queue loading in progress");
- if (q == NULL)
- q = qwalk_new(0);
-
- while (qwalk(q, &evpid)) {
- /* the envelope is already in ramqueue, skip */
- if (ramqueue_lookup_envelope(evpid) ||
- ramqueue_lookup_offload(evpid))
- continue;
-
- if (! queue_envelope_load(evpid, &envelope)) {
- log_debug("scheduler_ramqueue: evp -> /corrupt");
- queue_message_corrupt(evpid_to_msgid(evpid));
- continue;
- }
- if (ramqueue_expire(&envelope))
- continue;
+ uint32_t msgid;
+ struct rq_queue *update;
+ struct rq_host *host;
+ struct rq_message *message;
+ struct rq_batch *batch;
+ struct rq_envelope *envelope;
- scheduler_info(&si, &envelope);
- scheduler_ramqueue_insert(&si);
+ msgid = evpid_to_msgid(si->evpid);
- log_debug("ramqueue: loading interrupted");
- return (0);
+ /* find/prepare a ramqueue update */
+ if ((update = tree_get(&updates, msgid)) == NULL) {
+ update = xcalloc(1, sizeof *update, "scheduler_insert");
+ rq_queue_init(update);
+ tree_xset(&updates, msgid, update);
}
- qwalk_close(q);
- q = NULL;
- log_debug("ramqueue: loading over");
- return (1);
-}
-static int
-scheduler_ramqueue_next(u_int64_t *evpid, time_t *sched)
-{
- struct ramqueue_envelope *rq_evp = NULL;
+ /* find/prepare the host in ramqueue update */
+ if ((host = rq_host_lookup(&update->hosts, si->destination)) == NULL)
+ host = rq_host_create(&update->hosts, si->destination);
- log_debug("scheduler_ramqueue: next");
- TAILQ_FOREACH(rq_evp, &ramqueue.queue, queue_entry) {
- if (rq_evp->rq_batch->type == D_MDA)
- if (env->sc_flags & (SMTPD_MDA_PAUSED|SMTPD_MDA_BUSY))
- continue;
- if (rq_evp->rq_batch->type == D_MTA)
- if (env->sc_flags & (SMTPD_MTA_PAUSED|SMTPD_MTA_BUSY))
- continue;
- if (evpid)
- *evpid = rq_evp->evpid;
- if (sched)
- *sched = rq_evp->sched;
- log_debug("scheduler_ramqueue: next: found");
- return 1;
+ /* find/prepare the hosttree message in ramqueue update */
+ if ((batch = tree_get(&host->batches, msgid)) == NULL) {
+ batch = xcalloc(1, sizeof *batch, "scheduler_insert");
+ batch->msgid = msgid;
+ tree_init(&batch->envelopes);
+ tree_xset(&host->batches, msgid, batch);
}
- log_debug("scheduler_ramqueue: next: nothing schedulable");
- return 0;
-}
-
-static void
-scheduler_ramqueue_insert(struct scheduler_info *si)
-{
- struct ramqueue_host *rq_host;
- struct ramqueue_message *rq_msg;
- struct ramqueue_batch *rq_batch;
- struct ramqueue_envelope *rq_evp, *evp;
- u_int32_t msgid;
- time_t curtm = time(NULL);
-
- log_debug("scheduler_ramqueue: insert");
-
- rq_evp = ramqueue_lookup_offload(si->evpid);
- if (rq_evp) {
- rq_msg = rq_evp->rq_msg;
- rq_batch = rq_evp->rq_batch;
- rq_host = rq_evp->rq_host;
- RB_REMOVE(offloadtree, &ramqueue.offloadtree, rq_evp);
- }
- else {
- msgid = evpid_to_msgid(si->evpid);
- rq_msg = ramqueue_lookup_message(msgid);
- if (rq_msg == NULL)
- rq_msg = ramqueue_insert_message(msgid);
-
- rq_host = ramqueue_lookup_host(si->destination);
- if (rq_host == NULL)
- rq_host = ramqueue_insert_host(si->destination);
-
- rq_batch = ramqueue_lookup_batch(rq_host, msgid);
- if (rq_batch == NULL)
- rq_batch = ramqueue_insert_batch(rq_host, msgid);
-
- rq_evp = calloc(1, sizeof (*rq_evp));
- if (rq_evp == NULL)
- fatal("calloc");
- rq_evp->evpid = si->evpid;
- rq_batch->evpcnt++;
- rq_msg->evpcnt++;
+ /* find/prepare the msgtree message in ramqueue update */
+ if ((message = tree_get(&update->messages, msgid)) == NULL) {
+ message = xcalloc(1, sizeof *message, "scheduler_insert");
+ message->msgid = msgid;
+ tree_init(&message->envelopes);
+ tree_xset(&update->messages, msgid, message);
}
- rq_evp->sched = ramqueue_next_schedule(si, curtm);
- rq_evp->rq_host = rq_host;
- rq_evp->rq_batch = rq_batch;
- rq_evp->rq_msg = rq_msg;
- RB_INSERT(evptree, &rq_msg->evptree, rq_evp);
- TAILQ_INSERT_TAIL(&rq_batch->envelope_queue, rq_evp,
- batchqueue_entry);
-
- /* sorted insert */
- TAILQ_FOREACH(evp, &ramqueue.queue, queue_entry) {
- if (evp->sched >= rq_evp->sched) {
- TAILQ_INSERT_BEFORE(evp, rq_evp, queue_entry);
- break;
- }
+ /* create envelope in ramqueue message */
+ envelope = xcalloc(1, sizeof *envelope, "scheduler_insert");
+ envelope->evpid = si->evpid;
+ envelope->type = si->type;
+ envelope->message = message;
+ envelope->batch = batch;
+ envelope->sched = scheduler_compute_schedule(si);
+ envelope->expire = si->creation + si->expire;
+
+ if (envelope->expire < envelope->sched) {
+ envelope->flags |= RQ_ENVELOPE_EXPIRED;
+ tree_xset(&update->expired, envelope->evpid, envelope);
}
- if (evp == NULL)
- TAILQ_INSERT_TAIL(&ramqueue.queue, rq_evp, queue_entry);
-
- stat_increment(STATS_RAMQUEUE_ENVELOPE);
-}
-static void
-scheduler_ramqueue_schedule(u_int64_t evpid)
-{
- struct ramqueue_envelope *rq_evp;
- struct ramqueue_message *rq_msg;
- struct ramqueue_batch *rq_batch;
-
- log_debug("scheduler_ramqueue: schedule");
+ tree_xset(&batch->envelopes, envelope->evpid, envelope);
+ tree_xset(&message->envelopes, envelope->evpid, envelope);
- rq_evp = ramqueue_lookup_envelope(evpid);
- rq_msg = rq_evp->rq_msg;
- rq_batch = rq_evp->rq_batch;
+ if (si->type == D_BOUNCE)
+ envelope->queue = &update->bounce;
+ else if (si->type == D_MDA)
+ envelope->queue = &update->mda;
+ else if (si->type == D_MTA)
+ envelope->queue = &update->mta;
+ else
+ errx(1, "bad type");
- /* remove from msg tree, batch queue and linear queue */
- RB_REMOVE(evptree, &rq_msg->evptree, rq_evp);
- TAILQ_REMOVE(&rq_batch->envelope_queue, rq_evp, batchqueue_entry);
- TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry);
+ sorted_insert(envelope->queue, envelope);
- /* insert into offload tree*/
- RB_INSERT(offloadtree, &ramqueue.offloadtree, rq_evp);
-
- /* that's one less envelope to process in the ramqueue */
- stat_decrement(STATS_RAMQUEUE_ENVELOPE);
+ if (verbose & TRACE_SCHEDULER)
+ rq_queue_dump(update, "inserted", time(NULL));
}
static void
-scheduler_ramqueue_remove(u_int64_t evpid)
+scheduler_ramqueue_commit(uint32_t msgid)
{
- struct ramqueue_batch *rq_batch;
- struct ramqueue_message *rq_msg;
- struct ramqueue_envelope *rq_evp;
- struct ramqueue_host *rq_host;
-
- log_debug("scheduler_ramqueue: remove");
-
- rq_evp = ramqueue_lookup_offload(evpid);
- if (rq_evp) {
- RB_REMOVE(offloadtree, &ramqueue.offloadtree, rq_evp);
- rq_msg = rq_evp->rq_msg;
- rq_batch = rq_evp->rq_batch;
- rq_host = rq_evp->rq_host;
- }
- else {
- rq_evp = ramqueue_lookup_envelope(evpid);
- rq_msg = rq_evp->rq_msg;
- rq_batch = rq_evp->rq_batch;
- rq_host = rq_evp->rq_host;
-
- RB_REMOVE(evptree, &rq_msg->evptree, rq_evp);
- TAILQ_REMOVE(&rq_batch->envelope_queue, rq_evp, batchqueue_entry);
- TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry);
- stat_decrement(STATS_RAMQUEUE_ENVELOPE);
- }
+ struct rq_queue *update;
+ time_t now;
- rq_batch->evpcnt--;
- rq_msg->evpcnt--;
+ update = tree_xpop(&updates, msgid);
- /* check if we are the last of a message */
- if (rq_msg->evpcnt == 0) {
- ramqueue_remove_message(rq_msg);
+ if (verbose & TRACE_SCHEDULER) {
+ now = time(NULL);
+ rq_queue_dump(update, "commit update", now);
+ rq_queue_dump(&ramqueue, "before commit", now);
}
+ rq_queue_merge(&ramqueue, update);
- /* check if we are the last of a batch */
- if (rq_batch->evpcnt == 0) {
- ramqueue_remove_batch(rq_host, rq_batch);
- }
-
- /* check if we are the last of a host */
- if (TAILQ_FIRST(&rq_host->batch_queue) == NULL) {
- ramqueue_remove_host(rq_host);
- }
+ if (verbose & TRACE_SCHEDULER)
+ rq_queue_dump(&ramqueue, "after commit", now);
- free(rq_evp);
+ free(update);
}
-static void *
-scheduler_ramqueue_host(char *host)
+static void
+scheduler_ramqueue_rollback(uint32_t msgid)
{
- struct ramqueue_iter *iter;
- struct ramqueue_host *rq_host;
-
- rq_host = ramqueue_lookup_host(host);
- if (rq_host == NULL)
- return NULL;
+ struct rq_queue *update;
+ struct rq_envelope *envelope;
- iter = calloc(1, sizeof *iter);
- if (iter == NULL)
- err(1, "calloc");
+ update = tree_xpop(&updates, msgid);
- iter->type = RAMQUEUE_ITER_HOST;
- iter->u.host = rq_host;
+ while ((envelope = TAILQ_FIRST(&update->bounce)))
+ rq_envelope_delete(update, envelope);
+ while ((envelope = TAILQ_FIRST(&update->mda)))
+ rq_envelope_delete(update, envelope);
+ while ((envelope = TAILQ_FIRST(&update->mta)))
+ rq_envelope_delete(update, envelope);
- return iter;
+ free(update);
}
-static void *
-scheduler_ramqueue_batch(u_int64_t evpid)
+static void
+scheduler_ramqueue_update(struct scheduler_info *si)
{
- struct ramqueue_iter *iter;
- struct ramqueue_envelope *rq_evp;
+ struct rq_message *message;
+ struct rq_envelope *envelope;
+ uint32_t msgid;
- rq_evp = ramqueue_lookup_envelope(evpid);
- if (rq_evp == NULL)
- return NULL;
+ msgid = evpid_to_msgid(si->evpid);
+ message = tree_xget(&ramqueue.messages, msgid);
+ envelope = tree_xget(&message->envelopes, si->evpid);
- iter = calloc(1, sizeof *iter);
- if (iter == NULL)
- err(1, "calloc");
+ /* it *should* be in-flight */
+ if (!(envelope->flags & RQ_ENVELOPE_INFLIGHT))
+ log_warnx("evp:%016" PRIx64 " not in-flight", si->evpid);
+
+ envelope->flags &= ~RQ_ENVELOPE_INFLIGHT;
+ envelope->sched = scheduler_compute_schedule(si);
+ if (envelope->expire < envelope->sched) {
+ envelope->flags |= RQ_ENVELOPE_EXPIRED;
+ tree_xset(&ramqueue.expired, envelope->evpid, envelope);
+ }
- iter->type = RAMQUEUE_ITER_BATCH;
- iter->u.batch = rq_evp->rq_batch;
+ TAILQ_REMOVE(envelope->queue, envelope, entry);
+ if (si->type == D_BOUNCE)
+ envelope->queue = &ramqueue.bounce;
+ else if (si->type == D_MDA)
+ envelope->queue = &ramqueue.mda;
+ else if (si->type == D_MTA)
+ envelope->queue = &ramqueue.mta;
- return iter;
+ sorted_insert(envelope->queue, envelope);
}
-static void *
-scheduler_ramqueue_message(u_int32_t msgid)
+static void
+scheduler_ramqueue_delete(uint64_t evpid)
{
- struct ramqueue_iter *iter;
- struct ramqueue_message *rq_msg;
-
- rq_msg = ramqueue_lookup_message(msgid);
- if (rq_msg == NULL)
- return NULL;
-
- iter = calloc(1, sizeof *iter);
- if (iter == NULL)
- err(1, "calloc");
+ struct rq_message *message;
+ struct rq_envelope *envelope;
+ uint32_t msgid;
- iter->type = RAMQUEUE_ITER_MESSAGE;
- iter->u.message = rq_msg;
+ msgid = evpid_to_msgid(evpid);
+ message = tree_xget(&ramqueue.messages, msgid);
+ envelope = tree_xget(&message->envelopes, evpid);
- return iter;
+ /* it *must* be in-flight */
+ if (!(envelope->flags & RQ_ENVELOPE_INFLIGHT))
+ errx(1, "evp:%016" PRIx64 " not in-flight", evpid);
+ rq_envelope_delete(&ramqueue, envelope);
}
-static void *
-scheduler_ramqueue_queue(void)
-{
- struct ramqueue_iter *iter;
+static int
+scheduler_ramqueue_next(int typemask, uint64_t *evpid, time_t *sched)
+{
+ struct rq_envelope *evp_mda = NULL;
+ struct rq_envelope *evp_mta = NULL;
+ struct rq_envelope *evp_bounce = NULL;
+ struct rq_envelope *envelope = NULL;
+
+ if (verbose & TRACE_SCHEDULER)
+ rq_queue_dump(&ramqueue, "next", time(NULL));
+
+ *sched = 0;
+
+ if (typemask & SCHED_REMOVE && tree_root(&ramqueue.removed, evpid, NULL))
+ return (1);
+ if (typemask & SCHED_EXPIRE && tree_root(&ramqueue.expired, evpid, NULL))
+ return (1);
+
+ /* fetch first envelope from each queue */
+ if (typemask & SCHED_BOUNCE)
+ evp_bounce = TAILQ_FIRST(&ramqueue.bounce);
+ if (typemask & SCHED_MDA)
+ evp_mda = TAILQ_FIRST(&ramqueue.mda);
+ if (typemask & SCHED_MTA)
+ evp_mta = TAILQ_FIRST(&ramqueue.mta);
+
+ /* set current envelope to either one */
+ if (evp_bounce)
+ envelope = evp_bounce;
+ else if (evp_mda)
+ envelope = evp_mda;
+ else if (evp_mta)
+ envelope = evp_mta;
+ else
+ return (0);
- iter = calloc(1, sizeof *iter);
- if (iter == NULL)
- err(1, "calloc");
+ /* figure out which one should be scheduled first */
+ if (evp_bounce && evp_bounce->sched < envelope->sched)
+ envelope = evp_bounce;
+ if (evp_mda && evp_mda->sched < envelope->sched)
+ envelope = evp_mda;
+ if (evp_mta && evp_mta->sched < envelope->sched)
+ envelope = evp_mta;
- iter->type = RAMQUEUE_ITER_QUEUE;
+ *evpid = envelope->evpid;
+ *sched = envelope->sched;
- return iter;
+ return (1);
}
static void
-scheduler_ramqueue_close(void *hdl)
-{
- free(hdl);
-}
-
-int
-scheduler_ramqueue_fetch(void *hdl, u_int64_t *evpid)
-{
- struct ramqueue_iter *iter = hdl;
- struct ramqueue_envelope *rq_evp;
- struct ramqueue_batch *rq_batch;
-
- switch (iter->type) {
- case RAMQUEUE_ITER_HOST:
- rq_batch = TAILQ_FIRST(&iter->u.host->batch_queue);
- if (rq_batch == NULL)
- break;
- rq_evp = TAILQ_FIRST(&rq_batch->envelope_queue);
- if (rq_evp == NULL)
- break;
- *evpid = rq_evp->evpid;
- return 1;
-
- case RAMQUEUE_ITER_BATCH:
- rq_evp = TAILQ_FIRST(&iter->u.batch->envelope_queue);
- if (rq_evp == NULL)
- break;
- *evpid = rq_evp->evpid;
- return 1;
-
- case RAMQUEUE_ITER_MESSAGE:
- rq_evp = RB_ROOT(&iter->u.message->evptree);
- if (rq_evp == NULL)
- break;
- *evpid = rq_evp->evpid;
- return 1;
-
- case RAMQUEUE_ITER_QUEUE:
- rq_evp = TAILQ_FIRST(&ramqueue.queue);
- if (rq_evp == NULL)
- break;
- *evpid = rq_evp->evpid;
- return 1;
+scheduler_ramqueue_batch(int typemask, time_t curr, struct scheduler_batch *ret)
+{
+ struct rq_message *message;
+ struct rq_batch *batch;
+ struct rq_envelope *envelope;
+ struct id_list *item;
+ uint64_t evpid;
+ void *i;
+ int type;
+ time_t sched;
+
+ ret->evpids = NULL;
+
+ if (!scheduler_ramqueue_next(typemask, &evpid, &sched)) {
+ ret->type = SCHED_NONE;
+ return;
}
- return 0;
-}
-
-static int
-scheduler_ramqueue_force(u_int64_t id)
-{
- struct ramqueue_envelope *rq_evp;
- struct ramqueue_message *rq_msg;
- int ret;
-
- /* schedule *all* */
- if (id == 0) {
- ret = 0;
- TAILQ_FOREACH(rq_evp, &ramqueue.queue, queue_entry) {
- rq_evp->sched = 0;
- ret++;
+ if (tree_get(&ramqueue.removed, evpid)) {
+ ret->type = SCHED_REMOVE;
+ while (tree_poproot(&ramqueue.removed, &evpid, NULL)) {
+ item = xmalloc(sizeof *item, "schedule_batch");
+ item->id = evpid;
+ item->next = ret->evpids;
+ ret->evpids = item;
}
- return ret;
+ return;
}
- /* scheduling by evpid */
- if (id > 0xffffffffL) {
- rq_evp = ramqueue_lookup_envelope(id);
- if (rq_evp == NULL)
- return 0;
+ message = tree_xget(&ramqueue.messages, evpid_to_msgid(evpid));
+ envelope = tree_xget(&message->envelopes, evpid);
+
+ /* if the envelope has expired, return the expired list */
+ if (envelope->flags & RQ_ENVELOPE_EXPIRED) {
+ ret->type = SCHED_EXPIRE;
+ while (tree_poproot(&ramqueue.expired, &evpid, (void**)&envelope)) {
+ TAILQ_REMOVE(envelope->queue, envelope, entry);
+ TAILQ_INSERT_TAIL(&ramqueue.inflight, envelope, entry);
+ envelope->flags |= RQ_ENVELOPE_INFLIGHT;
+ item = xmalloc(sizeof *item, "schedule_batch");
+ item->id = evpid;
+ item->next = ret->evpids;
+ ret->evpids = item;
+ }
+ return;
+ }
- rq_evp->sched = 0;
- TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry);
- TAILQ_INSERT_HEAD(&ramqueue.queue, rq_evp, queue_entry);
- return 1;
+ if (sched > curr) {
+ ret->type = SCHED_DELAY;
+ ret->delay = sched - curr;
+ return;
}
- rq_msg = ramqueue_lookup_message(id);
- if (rq_msg == NULL)
- return 0;
-
- /* scheduling by msgid */
- ret = 0;
- RB_FOREACH(rq_evp, evptree, &rq_msg->evptree) {
- rq_evp->sched = 0;
- TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry);
- TAILQ_INSERT_HEAD(&ramqueue.queue, rq_evp, queue_entry);
- ret++;
+ batch = envelope->batch;
+ type = envelope->type;
+ if (type == D_BOUNCE)
+ ret->type = SCHED_BOUNCE;
+ else if (type == D_MDA)
+ ret->type = SCHED_MDA;
+ else if (type == D_MTA)
+ ret->type = SCHED_MTA;
+
+ i = NULL;
+ while((tree_iter(&batch->envelopes, &i, &evpid, (void*)&envelope))) {
+ if (envelope->type != type)
+ continue;
+ if (envelope->sched > curr)
+ continue;
+ if (envelope->flags & RQ_ENVELOPE_INFLIGHT)
+ continue;
+ if (envelope->flags & RQ_ENVELOPE_EXPIRED)
+ continue;
+ TAILQ_REMOVE(envelope->queue, envelope, entry);
+ TAILQ_INSERT_TAIL(&ramqueue.inflight, envelope, entry);
+ envelope->queue = &ramqueue.inflight;
+ envelope->flags |= RQ_ENVELOPE_INFLIGHT;
+ item = xmalloc(sizeof *item, "schedule_batch");
+ item->id = evpid;
+ item->next = ret->evpids;
+ ret->evpids = item;
}
- return ret;
}
-static struct ramqueue_host *
-ramqueue_lookup_host(char *host)
-{
- struct ramqueue_host hostkey;
- strlcpy(hostkey.hostname, host, sizeof(hostkey.hostname));
- return RB_FIND(hosttree, &ramqueue.hosttree, &hostkey);
-}
-
-static struct ramqueue_message *
-ramqueue_lookup_message(u_int32_t msgid)
+static void
+scheduler_ramqueue_schedule(uint64_t evpid)
{
- struct ramqueue_message msgkey;
+ struct rq_message *message;
+ struct rq_envelope *envelope;
+ uint32_t msgid;
- msgkey.msgid = msgid;
- return RB_FIND(msgtree, &ramqueue.msgtree, &msgkey);
-}
+ msgid = evpid_to_msgid(evpid);
+ if ((message = tree_get(&ramqueue.messages, msgid)) == NULL)
+ return;
+ if ((envelope = tree_xget(&message->envelopes, evpid)) == NULL)
+ return;
+ if (envelope->flags & RQ_ENVELOPE_INFLIGHT)
+ return;
-static struct ramqueue_envelope *
-ramqueue_lookup_offload(u_int64_t evpid)
-{
- struct ramqueue_envelope evpkey;
+ envelope->sched = time(NULL);
+ TAILQ_REMOVE(envelope->queue, envelope, entry);
+ sorted_insert(envelope->queue, envelope);
+}
- evpkey.evpid = evpid;
- return RB_FIND(offloadtree, &ramqueue.offloadtree, &evpkey);
+static void
+scheduler_ramqueue_remove(uint64_t evpid)
+{
+ struct rq_message *message;
+ struct rq_envelope *envelope;
+ uint32_t msgid;
+ struct evplist rmlist;
+ void *i;
+
+ if (evpid > 0xffffffff) {
+ msgid = evpid_to_msgid(evpid);
+ if ((message = tree_get(&ramqueue.messages, msgid)) == NULL)
+ return;
+ if ((envelope = tree_xget(&message->envelopes, evpid)) == NULL)
+ return;
+ if (envelope->flags & RQ_ENVELOPE_INFLIGHT)
+ return;
+ rq_envelope_delete(&ramqueue, envelope);
+ tree_xset(&ramqueue.removed, evpid, &ramqueue);
+ }
+ else {
+ msgid = evpid;
+ if ((message = tree_get(&ramqueue.messages, msgid)) == NULL)
+ return;
+
+ TAILQ_INIT(&rmlist);
+ i = NULL;
+ while (tree_iter(&message->envelopes, &i, &evpid,
+ (void*)(&envelope))) {
+ if (envelope->flags & RQ_ENVELOPE_INFLIGHT)
+ continue;
+ tree_xset(&ramqueue.removed, evpid, &ramqueue);
+ TAILQ_REMOVE(envelope->queue, envelope, entry);
+ envelope->queue = &rmlist;
+ TAILQ_INSERT_HEAD(&rmlist, envelope, entry);
+ }
+ while((envelope = TAILQ_FIRST(&rmlist)))
+ rq_envelope_delete(&ramqueue, envelope);
+ }
}
-static struct ramqueue_envelope *
-ramqueue_lookup_envelope(u_int64_t evpid)
+static void
+sorted_insert(struct evplist *list, struct rq_envelope *evp)
{
- struct ramqueue_message *rq_msg;
- struct ramqueue_envelope evpkey;
+ struct rq_envelope *item;
- rq_msg = ramqueue_lookup_message(evpid_to_msgid(evpid));
- if (rq_msg == NULL)
- return NULL;
-
- evpkey.evpid = evpid;
- return RB_FIND(evptree, &rq_msg->evptree, &evpkey);
+ TAILQ_FOREACH(item, list, entry) {
+ if (evp->sched < item->sched) {
+ TAILQ_INSERT_BEFORE(item, evp, entry);
+ return;
+ }
+ }
+ TAILQ_INSERT_TAIL(list, evp, entry);
}
-static struct ramqueue_batch *
-ramqueue_lookup_batch(struct ramqueue_host *rq_host, u_int32_t msgid)
+static void
+sorted_merge(struct evplist *list, struct evplist *from)
{
- struct ramqueue_batch *rq_batch;
+ struct rq_envelope *e;
- TAILQ_FOREACH(rq_batch, &rq_host->batch_queue, batch_entry) {
- if (rq_batch->msgid == msgid)
- return rq_batch;
+ /* XXX this is O(not good enough) */
+ while ((e = TAILQ_LAST(from, evplist))) {
+ TAILQ_REMOVE(from, e, entry);
+ sorted_insert(list, e);
+ e->queue = list;
}
-
- return NULL;
}
-static int
-ramqueue_expire(struct envelope *envelope)
+static void
+rq_queue_init(struct rq_queue *rq)
{
- struct envelope bounce;
- struct scheduler_info si;
- time_t curtm;
-
- curtm = time(NULL);
- if (curtm - envelope->creation >= envelope->expire) {
- envelope_set_errormsg(envelope,
- "message expired after sitting in queue for %d days",
- envelope->expire / 60 / 60 / 24);
- bounce_record_message(envelope, &bounce);
-
- scheduler_info(&si, &bounce);
- scheduler_ramqueue_insert(&si);
-
- log_debug("#### %s: queue_envelope_delete: %016" PRIx64,
- __func__, envelope->id);
- queue_envelope_delete(envelope);
- return 1;
- }
- return 0;
+ bzero(rq, sizeof *rq);
+
+ tree_init(&rq->messages);
+ tree_init(&rq->expired);
+ tree_init(&rq->removed);
+ SPLAY_INIT(&rq->hosts);
+ TAILQ_INIT(&rq->mda);
+ TAILQ_INIT(&rq->mta);
+ TAILQ_INIT(&rq->bounce);
+ TAILQ_INIT(&rq->inflight);
}
-static time_t
-ramqueue_next_schedule(struct scheduler_info *si, time_t curtm)
-{
- time_t delay;
-
- if (si->lasttry == 0)
- return curtm;
-
- delay = SMTPD_QUEUE_MAXINTERVAL;
-
- if (si->type == D_MDA ||
- si->type == D_BOUNCE) {
- if (si->retry < 5)
- return curtm;
-
- if (si->retry < 15)
- delay = (si->retry * 60) + arc4random_uniform(60);
+static void
+rq_queue_merge(struct rq_queue *rq, struct rq_queue *update)
+{
+ struct rq_message *message, *tomessage;
+ struct rq_batch *batch, *tobatch;
+ struct rq_host *host, *tohost;
+ struct rq_envelope *envelope;
+ uint64_t id;
+ void *i;
+
+ /* merge host tree */
+ while ((host = SPLAY_ROOT(&update->hosts))) {
+ SPLAY_REMOVE(hosttree, &update->hosts, host);
+ tohost = rq_host_lookup(&rq->hosts, host->host);
+ if (tohost == NULL)
+ tohost = rq_host_create(&rq->hosts, host->host);
+ /* merge batches for current host */
+ while (tree_poproot(&host->batches, &id, (void*)&batch)) {
+ tobatch = tree_get(&tohost->batches, batch->msgid);
+ if (tobatch == NULL) {
+ /* batch does not exist. re-use structure */
+ batch->host = tohost;
+ tree_xset(&tohost->batches, batch->msgid, batch);
+ continue;
+ }
+ /* need to re-link all envelopes before merging them */
+ i = NULL;
+ while((tree_iter(&batch->envelopes, &i, &id,
+ (void*)&envelope)))
+ envelope->batch = tobatch;
+ tree_merge(&tobatch->envelopes, &batch->envelopes);
+ free(batch);
+ }
+ free(host);
}
- if (si->type == D_MTA) {
- if (si->retry < 3)
- delay = SMTPD_QUEUE_INTERVAL;
- else if (si->retry <= 7) {
- delay = SMTPD_QUEUE_INTERVAL * (1 << (si->retry - 3));
- if (delay > SMTPD_QUEUE_MAXINTERVAL)
- delay = SMTPD_QUEUE_MAXINTERVAL;
+ while (tree_poproot(&update->messages, &id, (void*)&message)) {
+ if ((tomessage = tree_get(&rq->messages, id)) == NULL) {
+ /* message does not exist. re-use structure */
+ tree_xset(&rq->messages, id, message);
+ continue;
}
+ /* need to re-link all envelopes before merging them */
+ i = NULL;
+ while((tree_iter(&message->envelopes, &i, &id,
+ (void*)&envelope)))
+ envelope->message = tomessage;
+ tree_merge(&tomessage->envelopes, &message->envelopes);
+ free(message);
}
- if (curtm >= si->lasttry + delay)
- return curtm;
+ sorted_merge(&rq->bounce, &update->bounce);
+ sorted_merge(&rq->mda, &update->mda);
+ sorted_merge(&rq->mta, &update->mta);
- return curtm + delay;
+ tree_merge(&rq->expired, &update->expired);
+ tree_merge(&rq->removed, &update->removed);
}
-static struct ramqueue_message *
-ramqueue_insert_message(u_int32_t msgid)
+static void
+rq_envelope_delete(struct rq_queue *rq, struct rq_envelope *envelope)
{
- struct ramqueue_message *rq_msg;
+ struct rq_message *message;
+ struct rq_batch *batch;
+ struct rq_host *host;
- rq_msg = calloc(1, sizeof (*rq_msg));
- if (rq_msg == NULL)
- fatal("calloc");
- rq_msg->msgid = msgid;
- RB_INSERT(msgtree, &ramqueue.msgtree, rq_msg);
- RB_INIT(&rq_msg->evptree);
- stat_increment(STATS_RAMQUEUE_MESSAGE);
+ if (envelope->flags & RQ_ENVELOPE_EXPIRED)
+ tree_pop(&rq->expired, envelope->evpid);
- return rq_msg;
-}
+ TAILQ_REMOVE(envelope->queue, envelope, entry);
+ batch = envelope->batch;
+ message = envelope->message;
+ host = batch->host;
-static struct ramqueue_host *
-ramqueue_insert_host(char *host)
-{
- struct ramqueue_host *rq_host;
-
- rq_host = calloc(1, sizeof (*rq_host));
- if (rq_host == NULL)
- fatal("calloc");
- strlcpy(rq_host->hostname, host, sizeof(rq_host->hostname));
- TAILQ_INIT(&rq_host->batch_queue);
- RB_INSERT(hosttree, &ramqueue.hosttree, rq_host);
- stat_increment(STATS_RAMQUEUE_HOST);
+ tree_xpop(&message->envelopes, envelope->evpid);
+ if (tree_empty(&message->envelopes)) {
+ tree_xpop(&rq->messages, message->msgid);
+ free(message);
+ }
- return rq_host;
+ tree_xpop(&batch->envelopes, envelope->evpid);
+ if (tree_empty(&batch->envelopes)) {
+ tree_xpop(&host->batches, message->msgid);
+ if (tree_empty(&host->batches)) {
+ SPLAY_REMOVE(hosttree, &rq->hosts, host);
+ free(host);
+ }
+ free(batch);
+ }
+ free(envelope);
}
-static struct ramqueue_batch *
-ramqueue_insert_batch(struct ramqueue_host *rq_host, u_int32_t msgid)
+static const char *
+rq_envelope_to_text(struct rq_envelope *e, time_t tref)
{
- struct ramqueue_batch *rq_batch;
+ static char buf[256];
+ char t[64];
- rq_batch = calloc(1, sizeof (*rq_batch));
- if (rq_batch == NULL)
- fatal("calloc");
- rq_batch->b_id = generate_uid();
- rq_batch->rq_host = rq_host;
- rq_batch->msgid = msgid;
+ snprintf(buf, sizeof buf, "evp:%016" PRIx64 " [", e->evpid);
- TAILQ_INIT(&rq_batch->envelope_queue);
- TAILQ_INSERT_TAIL(&rq_host->batch_queue, rq_batch, batch_entry);
+ if (e->type == D_BOUNCE)
+ strlcat(buf, "bounce", sizeof buf);
+ else if (e->type == D_MDA)
+ strlcat(buf, "mda", sizeof buf);
+ else if (e->type == D_MTA)
+ strlcat(buf, "mta", sizeof buf);
- stat_increment(STATS_RAMQUEUE_BATCH);
+ snprintf(t, sizeof t, ",sched=%s", duration_to_text(e->sched - tref));
+ strlcat(buf, t, sizeof buf);
+ snprintf(t, sizeof t, ",exp=%s", duration_to_text(e->expire - tref));
+ strlcat(buf, t, sizeof buf);
- return rq_batch;
-}
+ if (e->flags & RQ_ENVELOPE_EXPIRED)
+ strlcat(buf, ",expired", sizeof buf);
+ if (e->flags & RQ_ENVELOPE_INFLIGHT)
+ strlcat(buf, ",in-flight", sizeof buf);
-static void
-ramqueue_remove_host(struct ramqueue_host *rq_host)
-{
- RB_REMOVE(hosttree, &ramqueue.hosttree, rq_host);
- free(rq_host);
- stat_decrement(STATS_RAMQUEUE_HOST);
+ strlcat(buf, "]", sizeof buf);
+
+ return (buf);
}
static void
-ramqueue_remove_message(struct ramqueue_message *rq_msg)
-{
- RB_REMOVE(msgtree, &ramqueue.msgtree, rq_msg);
- free(rq_msg);
- stat_decrement(STATS_RAMQUEUE_MESSAGE);
-}
+rq_queue_dump(struct rq_queue *rq, const char * name, time_t tref)
+{
+ struct rq_host *host;
+ struct rq_batch *batch;
+ struct rq_message *message;
+ struct rq_envelope *envelope;
+ void *i, *j;
+ uint64_t id;
+
+ log_debug("/--- ramqueue: %s", name);
+ SPLAY_FOREACH(host, hosttree, &rq->hosts) {
+ log_debug("| host:%s", host->host);
+ i = NULL;
+ while((tree_iter(&host->batches, &i, &id, (void*)&batch))) {
+ log_debug("| batch:%08" PRIx32, batch->msgid);
+ j = NULL;
+ while((tree_iter(&batch->envelopes, &j, &id,
+ (void*)&envelope)))
+ log_debug("| %s",
+ rq_envelope_to_text(envelope, tref));
+ }
+ }
+ i = NULL;
+ while((tree_iter(&rq->messages, &i, &id, (void*)&message))) {
+ log_debug("| msg:%08" PRIx32, message->msgid);
+ j = NULL;
+ while((tree_iter(&message->envelopes, &j, &id,
+ (void*)&envelope)))
+ log_debug("| %s", rq_envelope_to_text(envelope, tref));
+ }
-static void
-ramqueue_remove_batch(struct ramqueue_host *rq_host,
- struct ramqueue_batch *rq_batch)
-{
- TAILQ_REMOVE(&rq_host->batch_queue, rq_batch, batch_entry);
- free(rq_batch);
- stat_decrement(STATS_RAMQUEUE_BATCH);
+ log_debug("| bounces:");
+ TAILQ_FOREACH(envelope, &rq->bounce, entry)
+ log_debug("| %s", rq_envelope_to_text(envelope, tref));
+ log_debug("| mda:");
+ TAILQ_FOREACH(envelope, &rq->mda, entry)
+ log_debug("| %s", rq_envelope_to_text(envelope, tref));
+ log_debug("| mta:");
+ TAILQ_FOREACH(envelope, &rq->mta, entry)
+ log_debug("| %s", rq_envelope_to_text(envelope, tref));
+ log_debug("| in-flight:");
+ TAILQ_FOREACH(envelope, &rq->inflight, entry)
+ log_debug("| %s", rq_envelope_to_text(envelope, tref));
+ log_debug("\\---");
}
static int
-ramqueue_host_cmp(struct ramqueue_host *h1, struct ramqueue_host *h2)
+rq_host_cmp(struct rq_host *a, struct rq_host *b)
{
- return strcmp(h1->hostname, h2->hostname);
+ return (strcmp(a->host, b->host));
}
-
-static int
-ramqueue_msg_cmp(struct ramqueue_message *m1, struct ramqueue_message *m2)
+static struct rq_host *
+rq_host_lookup(struct hosttree *host_tree, char *host)
{
- return (m1->msgid < m2->msgid ? -1 : m1->msgid > m2->msgid);
+ struct rq_host key;
+
+ strlcpy(key.host, host, sizeof key.host);
+ return (SPLAY_FIND(hosttree, host_tree, &key));
}
-static int
-ramqueue_evp_cmp(struct ramqueue_envelope *e1, struct ramqueue_envelope *e2)
+static struct rq_host *
+rq_host_create(struct hosttree *host_tree, char *host)
{
- return (e1->evpid < e2->evpid ? -1 : e1->evpid > e2->evpid);
-}
+ struct rq_host *rq_host;
-RB_GENERATE(hosttree, ramqueue_host, hosttree_entry, ramqueue_host_cmp);
-RB_GENERATE(msgtree, ramqueue_message, msgtree_entry, ramqueue_msg_cmp);
-RB_GENERATE(evptree, ramqueue_envelope, evptree_entry, ramqueue_evp_cmp);
-RB_GENERATE(offloadtree, ramqueue_envelope, evptree_entry, ramqueue_evp_cmp);
+ rq_host = xcalloc(1, sizeof *rq_host, "rq_host_create");
+ tree_init(&rq_host->batches);
+ strlcpy(rq_host->host, host, sizeof rq_host->host);
+ SPLAY_INSERT(hosttree, host_tree, rq_host);
+
+ return (rq_host);
+}
+SPLAY_GENERATE(hosttree, rq_host, hosttree_entry, rq_host_cmp);
diff --git a/usr.sbin/smtpd/smtpd.h b/usr.sbin/smtpd/smtpd.h
index b98cf72f6f9..7941fd73bba 100644
--- a/usr.sbin/smtpd/smtpd.h
+++ b/usr.sbin/smtpd/smtpd.h
@@ -1,8 +1,9 @@
-/* $OpenBSD: smtpd.h,v 1.314 2012/08/07 21:47:57 eric Exp $ */
+/* $OpenBSD: smtpd.h,v 1.315 2012/08/08 08:50:42 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
* Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
+ * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
@@ -916,8 +917,8 @@ struct delivery_backend {
};
struct scheduler_info {
- u_int64_t evpid;
- char destination[MAXHOSTNAMELEN];
+ u_int64_t evpid;
+ char destination[MAXHOSTNAMELEN];
enum delivery_type type;
time_t creation;
@@ -926,30 +927,41 @@ struct scheduler_info {
u_int8_t retry;
};
+struct id_list {
+ struct id_list *next;
+ uint64_t id;
+};
+
+#define SCHED_NONE 0x00
+#define SCHED_DELAY 0x01
+#define SCHED_REMOVE 0x02
+#define SCHED_EXPIRE 0x04
+#define SCHED_BOUNCE 0x08
+#define SCHED_MDA 0x10
+#define SCHED_MTA 0x20
+
+struct scheduler_batch {
+ int type;
+ time_t delay;
+ struct id_list *evpids;
+};
+
struct scheduler_backend {
void (*init)(void);
- int (*setup)(void);
-
- int (*next)(u_int64_t *, time_t *);
void (*insert)(struct scheduler_info *);
- void (*schedule)(u_int64_t);
- void (*remove)(u_int64_t);
+ void (*commit)(u_int32_t);
+ void (*rollback)(u_int32_t);
- void *(*host)(char *);
- void *(*message)(u_int32_t);
- void *(*batch)(u_int64_t);
- void *(*queue)(void);
- void (*close)(void *);
+ void (*update)(struct scheduler_info *);
+ void (*delete)(u_int64_t);
- int (*fetch)(void *, u_int64_t *);
- int (*force)(u_int64_t);
+ void (*batch)(int, time_t, struct scheduler_batch *);
- void (*display)(void); /* may be NULL */
+ void (*schedule)(u_int64_t);
+ void (*remove)(u_int64_t);
};
-
-
extern struct smtpd *env;
extern void (*imsg_callback)(struct imsgev *, struct imsg *);
@@ -1100,13 +1112,11 @@ void qwalk_close(void *);
/* scheduler.c */
pid_t scheduler(void);
-void message_reset_flags(struct envelope *);
-
-/* scheduler.c */
+/* scheduler_bakend.c */
struct scheduler_backend *scheduler_backend_lookup(const char *);
void scheduler_info(struct scheduler_info *, struct envelope *);
-
+time_t scheduler_compute_schedule(struct scheduler_info *);
/* smtp.c */
pid_t smtp(void);
@@ -1199,6 +1209,7 @@ int valid_localpart(const char *);
int valid_domainpart(const char *);
char *ss_to_text(struct sockaddr_storage *);
char *time_to_text(time_t);
+char *duration_to_text(time_t);
int secure_file(int, char *, char *, uid_t, int);
int lowercase(char *, char *, size_t);
void xlowercase(char *, char *, size_t);
diff --git a/usr.sbin/smtpd/util.c b/usr.sbin/smtpd/util.c
index 7000a366c1d..8fa0beff0c4 100644
--- a/usr.sbin/smtpd/util.c
+++ b/usr.sbin/smtpd/util.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: util.c,v 1.68 2012/08/07 21:47:58 eric Exp $ */
+/* $OpenBSD: util.c,v 1.69 2012/08/08 08:50:42 eric Exp $ */
/*
* Copyright (c) 2000,2001 Markus Friedl. All rights reserved.
@@ -538,6 +538,51 @@ time_to_text(time_t when)
return buf;
}
+char *
+duration_to_text(time_t t)
+{
+ static char dst[64];
+ char buf[64];
+ int d, h, m, s;
+
+ if (t == 0) {
+ strlcpy(dst, "0s", sizeof dst);
+ return (dst);
+ }
+
+ dst[0] = '\0';
+ if (t < 0) {
+ strlcpy(dst, "-", sizeof dst);
+ t = -t;
+ }
+
+ s = t % 60;
+ t /= 60;
+ m = t % 60;
+ t /= 60;
+ h = t % 24;
+ d = t / 24;
+
+ if (d) {
+ snprintf(buf, sizeof buf, "%id", d);
+ strlcat(dst, buf, sizeof dst);
+ }
+ if (h) {
+ snprintf(buf, sizeof buf, "%ih", h);
+ strlcat(dst, buf, sizeof dst);
+ }
+ if (m) {
+ snprintf(buf, sizeof buf, "%im", m);
+ strlcat(dst, buf, sizeof dst);
+ }
+ if (s) {
+ snprintf(buf, sizeof buf, "%is", s);
+ strlcat(dst, buf, sizeof dst);
+ }
+
+ return (dst);
+}
+
int
text_to_netaddr(struct netaddr *netaddr, char *s)
{