summaryrefslogtreecommitdiff
path: root/usr.sbin
diff options
context:
space:
mode:
authorEric Faurot <eric@cvs.openbsd.org>2012-08-08 08:50:43 +0000
committerEric Faurot <eric@cvs.openbsd.org>2012-08-08 08:50:43 +0000
commit4fb538ff7fe692fc3494a906f82178b14bf48bf1 (patch)
tree74c2a8e0bb894841c747e6bee85be58c05869135 /usr.sbin
parente653821d1952b330839a2b474720109458d1d375 (diff)
Improve the scheduler backend API.
New envelopes are pushed into the scheduler through the insert() commit() rollback() transactional interface functions. Worklists are pulled from the scheduler through a single batch() interface function, which returns a list of envelope ids and the type of processing. Envelopes returned in this batch are said to be "in-flight", as opposed to "pending". They are supposed to be processed in some way, and either updated() or deleted() at some point. The schedule()/remove() functions are used to alter the internal state of "pending" envelopes to make them schedulable. The enve- lopes will be part of a worklist on the next call to batch(). Rewrite the scheduler_ramqueue backend. The initial queue loading in now done by the queue. ok gilles@
Diffstat (limited to 'usr.sbin')
-rw-r--r--usr.sbin/smtpd/bounce.c8
-rw-r--r--usr.sbin/smtpd/queue.c58
-rw-r--r--usr.sbin/smtpd/scheduler.c454
-rw-r--r--usr.sbin/smtpd/scheduler_backend.c23
-rw-r--r--usr.sbin/smtpd/scheduler_ramqueue.c1199
-rw-r--r--usr.sbin/smtpd/smtpd.h55
-rw-r--r--usr.sbin/smtpd/util.c47
7 files changed, 896 insertions, 948 deletions
diff --git a/usr.sbin/smtpd/bounce.c b/usr.sbin/smtpd/bounce.c
index c2147d12297..f31b322ef29 100644
--- a/usr.sbin/smtpd/bounce.c
+++ b/usr.sbin/smtpd/bounce.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: bounce.c,v 1.42 2012/07/09 09:57:53 gilles Exp $ */
+/* $OpenBSD: bounce.c,v 1.43 2012/08/08 08:50:42 eric Exp $ */
/*
* Copyright (c) 2009 Gilles Chehade <gilles@openbsd.org>
@@ -60,6 +60,10 @@ struct bounce {
struct io io;
};
+/* XXX remove later */
+void scheduler_envelope_update(struct envelope *);
+void scheduler_envelope_delete(struct envelope *);
+
static void bounce_send(struct bounce *, const char *, ...);
static int bounce_next(struct bounce *);
static void bounce_status(struct bounce *, const char *, ...);
@@ -210,10 +214,12 @@ bounce_status(struct bounce *bounce, const char *fmt, ...)
log_debug("#### %s: queue_envelope_delete: %016" PRIx64,
__func__, bounce->evp.id);
queue_envelope_delete(&bounce->evp);
+ scheduler_envelope_delete(&bounce->evp);
} else {
bounce->evp.retry++;
envelope_set_errormsg(&bounce->evp, "%s", status);
queue_envelope_update(&bounce->evp);
+ scheduler_envelope_update(&bounce->evp);
}
bounce->evp.id = 0;
free(status);
diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c
index d0e9549607b..c8c2d31403c 100644
--- a/usr.sbin/smtpd/queue.c
+++ b/usr.sbin/smtpd/queue.c
@@ -1,8 +1,9 @@
-/* $OpenBSD: queue.c,v 1.121 2012/07/09 09:57:53 gilles Exp $ */
+/* $OpenBSD: queue.c,v 1.122 2012/08/08 08:50:42 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
* Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
+ * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
@@ -39,6 +40,7 @@
#include "log.h"
static void queue_imsg(struct imsgev *, struct imsg *);
+static void queue_timeout(int, short, void *);
static void queue_pass_to_scheduler(struct imsgev *, struct imsg *);
static void queue_shutdown(void);
static void queue_sig_handler(int, short, void *);
@@ -224,7 +226,8 @@ queue(void)
{
pid_t pid;
struct passwd *pw;
-
+ struct timeval tv;
+ struct event ev_qload;
struct event ev_sigint;
struct event ev_sigterm;
@@ -286,6 +289,12 @@ queue(void)
config_pipes(peers, nitems(peers));
config_peers(peers, nitems(peers));
+ /* setup queue loading task */
+ evtimer_set(&ev_qload, queue_timeout, &ev_qload);
+ tv.tv_sec = 0;
+ tv.tv_usec = 10;
+ evtimer_add(&ev_qload, &tv);
+
if (event_dispatch() < 0)
fatal("event_dispatch");
queue_shutdown();
@@ -294,6 +303,51 @@ queue(void)
}
void
+queue_timeout(int fd, short event, void *p)
+{
+ static struct qwalk *q = NULL;
+ struct event *ev = p;
+ static uint64_t last_evpid = 0;
+ struct envelope envelope;
+ struct timeval tv;
+ uint64_t evpid;
+
+ if (q == NULL) {
+ log_info("queue: loading queue into scheduler");
+ q = qwalk_new(0);
+ }
+
+ while (qwalk(q, &evpid)) {
+ if (! queue_envelope_load(evpid, &envelope))
+ continue;
+
+ if (evpid_to_msgid(evpid) != evpid_to_msgid(last_evpid) &&
+ last_evpid != 0) {
+ envelope.id = last_evpid;
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &envelope,
+ sizeof envelope);
+ }
+
+ last_evpid = evpid;
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+ evtimer_add(ev, &tv);
+ return;
+ }
+
+ if (last_evpid) {
+ envelope.id = last_evpid;
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &envelope,
+ sizeof envelope);
+ }
+
+ log_info("queue: done loading queue into scheduler");
+ qwalk_close(q);
+}
+
+void
queue_submit_envelope(struct envelope *ep)
{
imsg_compose_event(env->sc_ievs[PROC_QUEUE],
diff --git a/usr.sbin/smtpd/scheduler.c b/usr.sbin/smtpd/scheduler.c
index 490e4db546e..9efaf8029a4 100644
--- a/usr.sbin/smtpd/scheduler.c
+++ b/usr.sbin/smtpd/scheduler.c
@@ -1,9 +1,10 @@
-/* $OpenBSD: scheduler.c,v 1.7 2012/07/18 22:04:49 eric Exp $ */
+/* $OpenBSD: scheduler.c,v 1.8 2012/08/08 08:50:42 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
* Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
* Copyright (c) 2008-2009 Jacek Masiulaniec <jacekm@dobremiasto.net>
+ * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
@@ -47,17 +48,18 @@
static void scheduler_imsg(struct imsgev *, struct imsg *);
static void scheduler_shutdown(void);
static void scheduler_sig_handler(int, short, void *);
-static void scheduler_setup_events(void);
static void scheduler_reset_events(void);
-static void scheduler_disable_events(void);
static void scheduler_timeout(int, short, void *);
-static void scheduler_remove(u_int64_t);
-static void scheduler_remove_envelope(u_int64_t);
-static int scheduler_process_envelope(u_int64_t);
-static int scheduler_process_batch(enum delivery_type, u_int64_t);
-static int scheduler_check_loop(struct envelope *);
+static void scheduler_process_remove(struct scheduler_batch *);
+static void scheduler_process_expire(struct scheduler_batch *);
+static void scheduler_process_bounce(struct scheduler_batch *);
+static void scheduler_process_mda(struct scheduler_batch *);
+static void scheduler_process_mta(struct scheduler_batch *);
static int scheduler_load_message(u_int32_t);
+void scheduler_envelope_update(struct envelope *);
+void scheduler_envelope_delete(struct envelope *);
+
static struct scheduler_backend *backend = NULL;
extern const char *backend_scheduler;
@@ -84,8 +86,9 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg)
e = imsg->data;
log_trace(TRACE_SCHEDULER,
"scheduler: IMSG_QUEUE_DELIVERY_OK: %016"PRIx64, e->id);
- backend->remove(e->id);
+ backend->delete(e->id);
queue_envelope_delete(e);
+ scheduler_reset_events();
return;
case IMSG_QUEUE_DELIVERY_TEMPFAIL:
@@ -96,7 +99,7 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg)
e->retry++;
queue_envelope_update(e);
scheduler_info(&si, e);
- backend->insert(&si);
+ backend->update(&si);
scheduler_reset_events();
return;
@@ -109,10 +112,11 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg)
bounce_record_message(e, &bounce);
scheduler_info(&si, &bounce);
backend->insert(&si);
- scheduler_reset_events();
+ backend->commit(evpid_to_msgid(bounce.id));
}
- backend->remove(e->id);
+ backend->delete(e->id);
queue_envelope_delete(e);
+ scheduler_reset_events();
return;
case IMSG_MDA_SESS_NEW:
@@ -138,7 +142,7 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg)
if (imsg->fd < 0 || !bounce_session(imsg->fd, e)) {
queue_envelope_update(e);
scheduler_info(&si, e);
- backend->insert(&si);
+ backend->update(&si);
scheduler_reset_events();
return;
}
@@ -175,24 +179,23 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg)
log_trace(TRACE_SCHEDULER,
"scheduler: IMSG_SCHEDULER_SCHEDULE: %016"PRIx64,
*(u_int64_t *)imsg->data);
- backend->force(*(u_int64_t *)imsg->data);
- scheduler_reset_events();
+ backend->schedule(*(u_int64_t *)imsg->data);
+ scheduler_reset_events();
return;
case IMSG_SCHEDULER_REMOVE:
log_trace(TRACE_SCHEDULER,
"scheduler: IMSG_SCHEDULER_REMOVE: %016"PRIx64,
*(u_int64_t *)imsg->data);
- scheduler_remove(*(u_int64_t *)imsg->data);
+ backend->remove(*(u_int64_t *)imsg->data);
scheduler_reset_events();
return;
-
}
errx(1, "scheduler_imsg: unexpected %s imsg", imsg_to_str(imsg->hdr.type));
}
-void
+static void
scheduler_sig_handler(int sig, short event, void *p)
{
switch (sig) {
@@ -205,25 +208,14 @@ scheduler_sig_handler(int sig, short event, void *p)
}
}
-void
+static void
scheduler_shutdown(void)
{
log_info("scheduler handler exiting");
_exit(0);
}
-void
-scheduler_setup_events(void)
-{
- struct timeval tv;
-
- evtimer_set(&env->sc_ev, scheduler_timeout, NULL);
- tv.tv_sec = 0;
- tv.tv_usec = 10;
- evtimer_add(&env->sc_ev, &tv);
-}
-
-void
+static void
scheduler_reset_events(void)
{
struct timeval tv;
@@ -233,12 +225,6 @@ scheduler_reset_events(void)
evtimer_add(&env->sc_ev, &tv);
}
-void
-scheduler_disable_events(void)
-{
- evtimer_del(&env->sc_ev);
-}
-
pid_t
scheduler(void)
{
@@ -304,200 +290,198 @@ scheduler(void)
config_pipes(peers, nitems(peers));
config_peers(peers, nitems(peers));
- scheduler_setup_events();
+ evtimer_set(&env->sc_ev, scheduler_timeout, NULL);
+ scheduler_reset_events();
event_dispatch();
- scheduler_disable_events();
scheduler_shutdown();
return (0);
}
-void
+static void
scheduler_timeout(int fd, short event, void *p)
{
- time_t nsched;
- time_t curtm;
- u_int64_t evpid;
- static int setup = 0;
- int delay = 0;
- struct timeval tv;
-
- log_trace(TRACE_SCHEDULER, "scheduler: entering scheduler_timeout");
-
- /* if we're not done setting up the scheduler, do it some more */
- if (! setup)
- setup = backend->setup();
-
- /* we don't have a schedulable envelope ... sleep */
- if (! backend->next(&evpid, &nsched))
- goto scheduler_sleep;
-
- /* is the envelope schedulable right away ? */
- curtm = time(NULL);
- if (nsched <= curtm) {
- /* yup */
- scheduler_process_envelope(evpid);
- }
- else {
- /* nope, so we can either keep the timeout delay to 0 if we
- * are not done setting up the scheduler, or sleep until it
- * is time to schedule that envelope otherwise.
- */
- if (setup)
- delay = nsched - curtm;
- }
+ struct timeval tv;
+ struct scheduler_batch batch;
+ int typemask;
- if (delay)
- log_trace(TRACE_SCHEDULER, "scheduler: pausing for %d seconds",
- delay);
- tv.tv_sec = delay;
- tv.tv_usec = 0;
- evtimer_add(&env->sc_ev, &tv);
- return;
+ log_trace(TRACE_SCHEDULER, "scheduler: getting next batch");
-scheduler_sleep:
- log_trace(TRACE_SCHEDULER, "scheduler: sleeping");
- return;
-}
-
-static int
-scheduler_process_envelope(u_int64_t evpid)
-{
- struct envelope envelope;
- size_t mta_av, mda_av, bnc_av;
- struct scheduler_info si;
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
- mta_av = env->sc_maxconn - stat_get(STATS_MTA_SESSION, STAT_ACTIVE);
- mda_av = env->sc_maxconn - stat_get(STATS_MDA_SESSION, STAT_ACTIVE);
- bnc_av = env->sc_maxconn - stat_get(STATS_SCHEDULER_BOUNCES, STAT_ACTIVE);
+ typemask = SCHED_REMOVE | SCHED_EXPIRE | SCHED_BOUNCE;
+ if (!(env->sc_flags & SMTPD_MDA_PAUSED))
+ typemask |= SCHED_MDA;
+ if (!(env->sc_flags & SMTPD_MTA_PAUSED))
+ typemask |= SCHED_MTA;
- if (! queue_envelope_load(evpid, &envelope))
- return 0;
+ backend->batch(typemask, time(NULL), &batch);
+ switch (batch.type) {
+ case SCHED_NONE:
+ log_trace(TRACE_SCHEDULER, "scheduler: sleeping");
+ return;
- if (envelope.type == D_MDA)
- if (mda_av == 0) {
- env->sc_flags |= SMTPD_MDA_BUSY;
- return 0;
- }
+ case SCHED_DELAY:
+ tv.tv_sec = batch.delay;
+ log_trace(TRACE_SCHEDULER,
+ "scheduler: pausing for %li seconds", tv.tv_sec);
+ break;
- if (envelope.type == D_MTA)
- if (mta_av == 0) {
- env->sc_flags |= SMTPD_MTA_BUSY;
- return 0;
- }
+ case SCHED_REMOVE:
+ scheduler_process_remove(&batch);
+ break;
- if (envelope.type == D_BOUNCE)
- if (bnc_av == 0) {
- env->sc_flags |= SMTPD_BOUNCE_BUSY;
- return 0;
- }
+ case SCHED_EXPIRE:
+ scheduler_process_expire(&batch);
+ break;
- if (scheduler_check_loop(&envelope)) {
- struct envelope bounce;
+ case SCHED_BOUNCE:
+ scheduler_process_bounce(&batch);
+ break;
- envelope_set_errormsg(&envelope, "loop has been detected");
- if (bounce_record_message(&envelope, &bounce)) {
- scheduler_info(&si, &bounce);
- backend->insert(&si);
- }
- backend->remove(evpid);
- queue_envelope_delete(&envelope);
+ case SCHED_MDA:
+ scheduler_process_mda(&batch);
+ break;
- scheduler_reset_events();
+ case SCHED_MTA:
+ scheduler_process_mta(&batch);
+ break;
- return 0;
+ default:
+ fatalx("scheduler_timeout: unknown batch type");
}
+ evtimer_add(&env->sc_ev, &tv);
+}
- return scheduler_process_batch(envelope.type, evpid);
+static void
+scheduler_process_remove(struct scheduler_batch *batch)
+{
+ struct envelope evp;
+ struct id_list *e;
+
+ while ((e = batch->evpids)) {
+ batch->evpids = e->next;
+ log_debug("scheduler: deleting evp:%016" PRIx64 " (removed)",
+ e->id);
+ evp.id = e->id;
+ queue_envelope_delete(&evp);
+ free(e);
+ }
}
-static int
-scheduler_process_batch(enum delivery_type type, u_int64_t evpid)
+static void
+scheduler_process_expire(struct scheduler_batch *batch)
{
struct envelope evp;
- void *batch;
- int fd;
-
- batch = backend->batch(evpid);
- switch (type) {
- case D_BOUNCE:
- while (backend->fetch(batch, &evpid)) {
- if (! queue_envelope_load(evpid, &evp))
- goto end;
-
- evp.lasttry = time(NULL);
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_SMTP_ENQUEUE, PROC_SMTP, 0, -1, &evp,
- sizeof evp);
- backend->schedule(evpid);
- }
+ struct id_list *e;
+
+ while ((e = batch->evpids)) {
+ batch->evpids = e->next;
+ log_debug("scheduler: deleting evp:%016" PRIx64 " (expire)",
+ e->id);
+ evp.id = e->id;
+ queue_envelope_delete(&evp);
+ free(e);
+ }
+}
+
+static void
+scheduler_process_bounce(struct scheduler_batch *batch)
+{
+ struct envelope evp;
+ struct id_list *e;
+
+ while ((e = batch->evpids)) {
+ batch->evpids = e->next;
+ log_debug("scheduler: scheduling evp:%016" PRIx64 " (bounce)",
+ e->id);
+ queue_envelope_load(e->id, &evp);
+ evp.lasttry = time(NULL);
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE],
+ IMSG_SMTP_ENQUEUE, PROC_SMTP, 0, -1, &evp,
+ sizeof evp);
stat_increment(STATS_SCHEDULER);
stat_increment(STATS_SCHEDULER_BOUNCES);
- break;
-
- case D_MDA:
- backend->fetch(batch, &evpid);
- if (! queue_envelope_load(evpid, &evp))
- goto end;
-
+ free(e);
+ }
+}
+
+static void
+scheduler_process_mda(struct scheduler_batch *batch)
+{
+ struct envelope evp;
+ struct id_list *e;
+ int fd;
+
+ while ((e = batch->evpids)) {
+ batch->evpids = e->next;
+ log_debug("scheduler: scheduling evp:%016" PRIx64 " (mda)",
+ e->id);
+ queue_envelope_load(e->id, &evp);
evp.lasttry = time(NULL);
- fd = queue_message_fd_r(evpid_to_msgid(evpid));
+ fd = queue_message_fd_r(evpid_to_msgid(evp.id));
imsg_compose_event(env->sc_ievs[PROC_QUEUE],
IMSG_MDA_SESS_NEW, PROC_MDA, 0, fd, &evp,
sizeof evp);
- backend->schedule(evpid);
-
stat_increment(STATS_SCHEDULER);
stat_increment(STATS_MDA_SESSION);
- break;
+ free(e);
+ }
+}
- case D_MTA: {
- struct mta_batch mta_batch;
+static void
+scheduler_process_mta(struct scheduler_batch *batch)
+{
+ struct envelope evp;
+ struct mta_batch mta_batch;
+ struct id_list *e;
- /* FIXME */
- if (! backend->fetch(batch, &evpid))
- goto end;
- if (! queue_envelope_load(evpid, &evp))
- goto end;
+ queue_envelope_load(batch->evpids->id, &evp);
- bzero(&mta_batch, sizeof mta_batch);
- mta_batch.id = arc4random();
- mta_batch.relay = evp.agent.mta.relay;
+ bzero(&mta_batch, sizeof mta_batch);
+ mta_batch.id = arc4random();
+ mta_batch.relay = evp.agent.mta.relay;
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_BATCH_CREATE, PROC_MTA, 0, -1, &mta_batch,
- sizeof mta_batch);
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE],
+ IMSG_BATCH_CREATE, PROC_MTA, 0, -1, &mta_batch,
+ sizeof mta_batch);
- while (backend->fetch(batch, &evpid)) {
- if (! queue_envelope_load(evpid, &evp))
- goto end;
- evp.lasttry = time(NULL); /* FIXME */
- evp.batch_id = mta_batch.id;
+ while ((e = batch->evpids)) {
+ batch->evpids = e->next;
+ log_debug("scheduler: scheduling evp:%016" PRIx64 " (mta)",
+ e->id);
+ queue_envelope_load(e->id, &evp);
+ evp.lasttry = time(NULL);
+ evp.batch_id = mta_batch.id;
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_BATCH_APPEND,
+ PROC_MTA, 0, -1, &evp, sizeof evp);
+ free(e);
+ stat_increment(STATS_SCHEDULER);
+ }
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_BATCH_APPEND, PROC_MTA, 0, -1, &evp,
- sizeof evp);
+ imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_BATCH_CLOSE,
+ PROC_MTA, 0, -1, &mta_batch, sizeof mta_batch);
- backend->schedule(evpid);
- stat_increment(STATS_SCHEDULER);
- }
+ stat_increment(STATS_MTA_SESSION);
+}
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_BATCH_CLOSE, PROC_MTA, 0, -1, &mta_batch,
- sizeof mta_batch);
+void
+scheduler_envelope_update(struct envelope *e)
+{
+ struct scheduler_info si;
- stat_increment(STATS_MTA_SESSION);
- break;
- }
-
- default:
- fatalx("scheduler_process_batchqueue: unknown type");
- }
+ scheduler_info(&si, e);
+ backend->update(&si);
+ scheduler_reset_events();
+}
-end:
- backend->close(batch);
- return 1;
+void
+scheduler_envelope_delete(struct envelope *e)
+{
+ backend->delete(e->id);
+ scheduler_reset_events();
}
static int
@@ -506,7 +490,7 @@ scheduler_load_message(u_int32_t msgid)
struct qwalk *q;
u_int64_t evpid;
struct envelope envelope;
- struct scheduler_info si;
+ struct scheduler_info si;
q = qwalk_new(msgid);
while (qwalk(q, &evpid)) {
@@ -515,98 +499,8 @@ scheduler_load_message(u_int32_t msgid)
scheduler_info(&si, &envelope);
backend->insert(&si);
}
- qwalk_close(q);
+ qwalk_close(q);
+ backend->commit(msgid);
return 1;
}
-
-static int
-scheduler_check_loop(struct envelope *ep)
-{
- int fd;
- FILE *fp;
- char *buf, *lbuf;
- size_t len;
- struct mailaddr maddr;
- int ret = 0;
- int rcvcount = 0;
-
- fd = queue_message_fd_r(evpid_to_msgid(ep->id));
- if ((fp = fdopen(fd, "r")) == NULL)
- fatal("fdopen");
-
- lbuf = NULL;
- while ((buf = fgetln(fp, &len))) {
- if (buf[len - 1] == '\n')
- buf[len - 1] = '\0';
- else {
- /* EOF without EOL, copy and add the NUL */
- if ((lbuf = malloc(len + 1)) == NULL)
- err(1, NULL);
- memcpy(lbuf, buf, len);
- lbuf[len] = '\0';
- buf = lbuf;
- }
-
- if (strchr(buf, ':') == NULL && !isspace((int)*buf))
- break;
-
- if (strncasecmp("Received: ", buf, 10) == 0) {
- rcvcount++;
- if (rcvcount == MAX_HOPS_COUNT) {
- ret = 1;
- break;
- }
- }
-
- else if (strncasecmp("Delivered-To: ", buf, 14) == 0) {
- struct mailaddr dest;
-
- bzero(&maddr, sizeof (struct mailaddr));
- if (! email_to_mailaddr(&maddr, buf + 14))
- continue;
-
- dest = ep->dest;
- if (ep->type == D_BOUNCE)
- dest = ep->sender;
-
- if (strcasecmp(maddr.user, dest.user) == 0 &&
- strcasecmp(maddr.domain, dest.domain) == 0) {
- ret = 1;
- break;
- }
- }
- }
- free(lbuf);
-
- fclose(fp);
- return ret;
-}
-
-static void
-scheduler_remove(u_int64_t id)
-{
- void *msg;
-
- /* removing by evpid */
- if (id > 0xffffffffL) {
- scheduler_remove_envelope(id);
- return;
- }
-
- /* removing by msgid */
- msg = backend->message(id);
- while (backend->fetch(msg, &id))
- scheduler_remove_envelope(id);
- backend->close(msg);
-}
-
-static void
-scheduler_remove_envelope(u_int64_t evpid)
-{
- struct envelope evp;
-
- evp.id = evpid;
- queue_envelope_delete(&evp);
- backend->remove(evpid);
-}
diff --git a/usr.sbin/smtpd/scheduler_backend.c b/usr.sbin/smtpd/scheduler_backend.c
index e22411fb7d9..7dafbf488b4 100644
--- a/usr.sbin/smtpd/scheduler_backend.c
+++ b/usr.sbin/smtpd/scheduler_backend.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: scheduler_backend.c,v 1.2 2012/07/28 12:06:41 chl Exp $ */
+/* $OpenBSD: scheduler_backend.c,v 1.3 2012/08/08 08:50:42 eric Exp $ */
/*
* Copyright (c) 2012 Gilles Chehade <gilles@openbsd.org>
@@ -57,3 +57,24 @@ scheduler_info(struct scheduler_info *sched, struct envelope *evp)
sched->expire = evp->expire;
sched->retry = evp->retry;
}
+
+time_t
+scheduler_compute_schedule(struct scheduler_info *sched)
+{
+ time_t delay;
+
+ if (sched->retry == 0)
+ delay = 0;
+#if 0
+ /* for testing scheduler sleep */
+ delay == arc4random() % 30;
+#endif
+ else if (sched->retry < 4)
+ delay = (sched->retry * 15 * 60);
+ else if (sched->retry < 8)
+ delay = ((sched->retry - 3) * 60 * 60);
+ else
+ delay = ((sched->retry - 7) * 24 * 60 * 60);
+
+ return (sched->creation + delay);
+}
diff --git a/usr.sbin/smtpd/scheduler_ramqueue.c b/usr.sbin/smtpd/scheduler_ramqueue.c
index a2ab2f1254d..07717fcd616 100644
--- a/usr.sbin/smtpd/scheduler_ramqueue.c
+++ b/usr.sbin/smtpd/scheduler_ramqueue.c
@@ -1,7 +1,8 @@
-/* $OpenBSD: scheduler_ramqueue.c,v 1.11 2012/07/10 11:13:40 gilles Exp $ */
+/* $OpenBSD: scheduler_ramqueue.c,v 1.12 2012/08/08 08:50:42 eric Exp $ */
/*
* Copyright (c) 2012 Gilles Chehade <gilles@openbsd.org>
+ * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
@@ -36,787 +37,703 @@
#include "smtpd.h"
#include "log.h"
+SPLAY_HEAD(hosttree, rq_host);
+TAILQ_HEAD(evplist, rq_envelope);
-struct ramqueue_host {
- RB_ENTRY(ramqueue_host) hosttree_entry;
- TAILQ_HEAD(,ramqueue_batch) batch_queue;
- char hostname[MAXHOSTNAMELEN];
+struct rq_host {
+ SPLAY_ENTRY(rq_host) hosttree_entry;
+ char host[MAXHOSTNAMELEN];
+ struct tree batches;
};
-struct ramqueue_batch {
- enum delivery_type type;
- TAILQ_ENTRY(ramqueue_batch) batch_entry;
- TAILQ_HEAD(,ramqueue_envelope) envelope_queue;
- struct ramqueue_host *rq_host;
- u_int64_t b_id;
- u_int32_t msgid;
- u_int32_t evpcnt;
-};
-struct ramqueue_envelope {
- TAILQ_ENTRY(ramqueue_envelope) queue_entry;
- TAILQ_ENTRY(ramqueue_envelope) batchqueue_entry;
- RB_ENTRY(ramqueue_envelope) evptree_entry;
- struct ramqueue_batch *rq_batch;
- struct ramqueue_message *rq_msg;
- struct ramqueue_host *rq_host;
- u_int64_t evpid;
- time_t sched;
-};
-struct ramqueue_message {
- RB_ENTRY(ramqueue_message) msgtree_entry;
- RB_HEAD(evptree, ramqueue_envelope) evptree;
- u_int32_t msgid;
- u_int32_t evpcnt;
+
+struct rq_message {
+ uint32_t msgid;
+ struct tree envelopes;
};
-struct ramqueue {
- RB_HEAD(hosttree, ramqueue_host) hosttree;
- RB_HEAD(msgtree, ramqueue_message) msgtree;
- RB_HEAD(offloadtree, ramqueue_envelope) offloadtree;
- TAILQ_HEAD(,ramqueue_envelope) queue;
+
+struct rq_batch {
+ uint32_t msgid;
+ struct tree envelopes;
+ struct rq_host *host;
};
-RB_PROTOTYPE(hosttree, ramqueue_host, hosttree_entry, ramqueue_host_cmp);
-RB_PROTOTYPE(msgtree, ramqueue_message, msg_entry, ramqueue_msg_cmp);
-RB_PROTOTYPE(evptree, ramqueue_envelope, evp_entry, ramqueue_evp_cmp);
-RB_PROTOTYPE(offloadtree, ramqueue_envelope, evp_entry, ramqueue_evp_cmp);
+struct rq_envelope {
+ TAILQ_ENTRY(rq_envelope) entry;
+
+ uint64_t evpid;
+ int type;
+
+#define RQ_ENVELOPE_INFLIGHT 0x01
+#define RQ_ENVELOPE_EXPIRED 0x02
+ uint8_t flags;
-enum ramqueue_iter_type {
- RAMQUEUE_ITER_HOST,
- RAMQUEUE_ITER_BATCH,
- RAMQUEUE_ITER_MESSAGE,
- RAMQUEUE_ITER_QUEUE
+ time_t sched;
+ time_t expire;
+
+ struct rq_message *message;
+ struct rq_batch *batch;
+ struct evplist *queue;
};
-struct ramqueue_iter {
- enum ramqueue_iter_type type;
- union {
- struct ramqueue_host *host;
- struct ramqueue_batch *batch;
- struct ramqueue_message *message;
- } u;
+struct rq_queue {
+ struct hosttree hosts;
+ struct tree messages;
+ struct evplist mda;
+ struct evplist mta;
+ struct evplist bounce;
+ struct evplist inflight;
+ struct tree expired;
+ struct tree removed;
};
+static void scheduler_ramqueue_init(void);
+static void scheduler_ramqueue_insert(struct scheduler_info *);
+static void scheduler_ramqueue_commit(uint32_t);
+static void scheduler_ramqueue_rollback(uint32_t);
+static void scheduler_ramqueue_update(struct scheduler_info *);
+static void scheduler_ramqueue_delete(uint64_t);
+static void scheduler_ramqueue_batch(int, time_t, struct scheduler_batch *);
+static void scheduler_ramqueue_schedule(uint64_t);
+static void scheduler_ramqueue_remove(uint64_t);
+
+static int scheduler_ramqueue_next(int, uint64_t *, time_t *);
+static void sorted_insert(struct evplist *, struct rq_envelope *);
+static void sorted_merge(struct evplist *, struct evplist *);
+
-static int ramqueue_host_cmp(struct ramqueue_host *, struct ramqueue_host *);
-static int ramqueue_msg_cmp(struct ramqueue_message *, struct ramqueue_message *);
-static int ramqueue_evp_cmp(struct ramqueue_envelope *, struct ramqueue_envelope *);
-static struct ramqueue_host *ramqueue_lookup_host(char *);
-static struct ramqueue_host *ramqueue_insert_host(char *);
-static void ramqueue_remove_host(struct ramqueue_host *);
-static struct ramqueue_batch *ramqueue_lookup_batch(struct ramqueue_host *,
- u_int32_t);
-static struct ramqueue_batch *ramqueue_insert_batch(struct ramqueue_host *,
- u_int32_t);
-static void ramqueue_remove_batch(struct ramqueue_host *, struct ramqueue_batch *);
-static struct ramqueue_message *ramqueue_lookup_message(u_int32_t);
-static struct ramqueue_message *ramqueue_insert_message(u_int32_t);
-static void ramqueue_remove_message(struct ramqueue_message *);
-
-static struct ramqueue_envelope *ramqueue_lookup_envelope(u_int64_t);
-static struct ramqueue_envelope *ramqueue_lookup_offload(u_int64_t);
-
-
-/*NEEDSFIX*/
-static int ramqueue_expire(struct envelope *);
-static time_t ramqueue_next_schedule(struct scheduler_info *, time_t);
-
-static void scheduler_ramqueue_init(void);
-static int scheduler_ramqueue_setup(void);
-static int scheduler_ramqueue_next(u_int64_t *, time_t *);
-static void scheduler_ramqueue_insert(struct scheduler_info *);
-static void scheduler_ramqueue_schedule(u_int64_t);
-static void scheduler_ramqueue_remove(u_int64_t);
-static void *scheduler_ramqueue_host(char *);
-static void *scheduler_ramqueue_message(u_int32_t);
-static void *scheduler_ramqueue_batch(u_int64_t);
-static void *scheduler_ramqueue_queue(void);
-static void scheduler_ramqueue_close(void *);
-static int scheduler_ramqueue_fetch(void *, u_int64_t *);
-static int scheduler_ramqueue_force(u_int64_t);
-static void scheduler_ramqueue_display(void);
+static void rq_queue_init(struct rq_queue *);
+static void rq_queue_merge(struct rq_queue *, struct rq_queue *);
+static void rq_queue_dump(struct rq_queue *, const char *, time_t);
+static void rq_envelope_delete(struct rq_queue *, struct rq_envelope *);
+static const char *rq_envelope_to_text(struct rq_envelope *, time_t);
+static struct rq_host *rq_host_lookup(struct hosttree *, char *);
+static struct rq_host *rq_host_create(struct hosttree *, char *);
+static int rq_host_cmp(struct rq_host *, struct rq_host *);
+
+SPLAY_PROTOTYPE(hosttree, rq_host, hosttree_entry, rq_host_cmp);
struct scheduler_backend scheduler_backend_ramqueue = {
scheduler_ramqueue_init,
- scheduler_ramqueue_setup,
- scheduler_ramqueue_next,
- scheduler_ramqueue_insert,
- scheduler_ramqueue_schedule,
- scheduler_ramqueue_remove,
- scheduler_ramqueue_host,
- scheduler_ramqueue_message,
- scheduler_ramqueue_batch,
- scheduler_ramqueue_queue,
- scheduler_ramqueue_close,
- scheduler_ramqueue_fetch,
- scheduler_ramqueue_force,
- scheduler_ramqueue_display
-};
-static struct ramqueue ramqueue;
-static void
-scheduler_ramqueue_display_hosttree(void)
-{
- struct ramqueue_host *rq_host;
- struct ramqueue_batch *rq_batch;
- struct ramqueue_envelope *rq_evp;
-
- log_debug("\tscheduler_ramqueue: hosttree display");
- RB_FOREACH(rq_host, hosttree, &ramqueue.hosttree) {
- log_debug("\t\thost: [%p] %s", rq_host, rq_host->hostname);
- TAILQ_FOREACH(rq_batch, &rq_host->batch_queue, batch_entry) {
- log_debug("\t\t\tbatch: [%p] %016x",
- rq_batch, rq_batch->msgid);
- TAILQ_FOREACH(rq_evp, &rq_batch->envelope_queue,
- batchqueue_entry) {
- log_debug("\t\t\t\tevpid: [%p] %016"PRIx64,
- rq_evp, rq_evp->evpid);
- }
- }
- }
-}
+ scheduler_ramqueue_insert,
+ scheduler_ramqueue_commit,
+ scheduler_ramqueue_rollback,
-static void
-scheduler_ramqueue_display_msgtree(void)
-{
- struct ramqueue_message *rq_msg;
- struct ramqueue_envelope *rq_evp;
-
- log_debug("\tscheduler_ramqueue: msgtree display");
- RB_FOREACH(rq_msg, msgtree, &ramqueue.msgtree) {
- log_debug("\t\tmsg: [%p] %016x", rq_msg, rq_msg->msgid);
- RB_FOREACH(rq_evp, evptree, &rq_msg->evptree) {
- log_debug("\t\t\tevp: [%p] %016"PRIx64,
- rq_evp, rq_evp->evpid);
- }
- }
-}
+ scheduler_ramqueue_update,
+ scheduler_ramqueue_delete,
-static void
-scheduler_ramqueue_display_offloadtree(void)
-{
- struct ramqueue_envelope *rq_evp;
+ scheduler_ramqueue_batch,
- log_debug("\tscheduler_ramqueue: offloadtree display");
- RB_FOREACH(rq_evp, offloadtree, &ramqueue.offloadtree) {
- log_debug("\t\t\tevp: [%p] %016"PRIx64,
- rq_evp, rq_evp->evpid);
- }
-}
+ scheduler_ramqueue_schedule,
+ scheduler_ramqueue_remove,
+};
-static void
-scheduler_ramqueue_display_queue(void)
-{
- struct ramqueue_envelope *rq_evp;
+static struct rq_queue ramqueue;
+static struct tree updates;
- log_debug("\tscheduler_ramqueue: queue display");
- TAILQ_FOREACH(rq_evp, &ramqueue.queue, queue_entry) {
- log_debug("\t\tevpid: [%p] [batch: %p], %016"PRIx64,
- rq_evp, rq_evp->rq_batch, rq_evp->evpid);
- }
-}
-
-static void
-scheduler_ramqueue_display(void)
-{
- log_debug("scheduler_ramqueue: display");
- scheduler_ramqueue_display_hosttree();
- scheduler_ramqueue_display_msgtree();
- scheduler_ramqueue_display_offloadtree();
- scheduler_ramqueue_display_queue();
-}
+extern int verbose;
static void
scheduler_ramqueue_init(void)
{
- log_debug("scheduler_ramqueue: init");
- bzero(&ramqueue, sizeof (ramqueue));
- TAILQ_INIT(&ramqueue.queue);
- RB_INIT(&ramqueue.hosttree);
- RB_INIT(&ramqueue.msgtree);
- RB_INIT(&ramqueue.offloadtree);
+ rq_queue_init(&ramqueue);
+ tree_init(&updates);
}
-static int
-scheduler_ramqueue_setup(void)
+static void
+scheduler_ramqueue_insert(struct scheduler_info *si)
{
- struct envelope envelope;
- static struct qwalk *q = NULL;
- u_int64_t evpid;
- struct scheduler_info si;
-
- log_debug("scheduler_ramqueue: load");
-
- log_info("scheduler_ramqueue: queue loading in progress");
- if (q == NULL)
- q = qwalk_new(0);
-
- while (qwalk(q, &evpid)) {
- /* the envelope is already in ramqueue, skip */
- if (ramqueue_lookup_envelope(evpid) ||
- ramqueue_lookup_offload(evpid))
- continue;
-
- if (! queue_envelope_load(evpid, &envelope)) {
- log_debug("scheduler_ramqueue: evp -> /corrupt");
- queue_message_corrupt(evpid_to_msgid(evpid));
- continue;
- }
- if (ramqueue_expire(&envelope))
- continue;
+ uint32_t msgid;
+ struct rq_queue *update;
+ struct rq_host *host;
+ struct rq_message *message;
+ struct rq_batch *batch;
+ struct rq_envelope *envelope;
- scheduler_info(&si, &envelope);
- scheduler_ramqueue_insert(&si);
+ msgid = evpid_to_msgid(si->evpid);
- log_debug("ramqueue: loading interrupted");
- return (0);
+ /* find/prepare a ramqueue update */
+ if ((update = tree_get(&updates, msgid)) == NULL) {
+ update = xcalloc(1, sizeof *update, "scheduler_insert");
+ rq_queue_init(update);
+ tree_xset(&updates, msgid, update);
}
- qwalk_close(q);
- q = NULL;
- log_debug("ramqueue: loading over");
- return (1);
-}
-static int
-scheduler_ramqueue_next(u_int64_t *evpid, time_t *sched)
-{
- struct ramqueue_envelope *rq_evp = NULL;
+ /* find/prepare the host in ramqueue update */
+ if ((host = rq_host_lookup(&update->hosts, si->destination)) == NULL)
+ host = rq_host_create(&update->hosts, si->destination);
- log_debug("scheduler_ramqueue: next");
- TAILQ_FOREACH(rq_evp, &ramqueue.queue, queue_entry) {
- if (rq_evp->rq_batch->type == D_MDA)
- if (env->sc_flags & (SMTPD_MDA_PAUSED|SMTPD_MDA_BUSY))
- continue;
- if (rq_evp->rq_batch->type == D_MTA)
- if (env->sc_flags & (SMTPD_MTA_PAUSED|SMTPD_MTA_BUSY))
- continue;
- if (evpid)
- *evpid = rq_evp->evpid;
- if (sched)
- *sched = rq_evp->sched;
- log_debug("scheduler_ramqueue: next: found");
- return 1;
+ /* find/prepare the hosttree message in ramqueue update */
+ if ((batch = tree_get(&host->batches, msgid)) == NULL) {
+ batch = xcalloc(1, sizeof *batch, "scheduler_insert");
+ batch->msgid = msgid;
+ tree_init(&batch->envelopes);
+ tree_xset(&host->batches, msgid, batch);
}
- log_debug("scheduler_ramqueue: next: nothing schedulable");
- return 0;
-}
-
-static void
-scheduler_ramqueue_insert(struct scheduler_info *si)
-{
- struct ramqueue_host *rq_host;
- struct ramqueue_message *rq_msg;
- struct ramqueue_batch *rq_batch;
- struct ramqueue_envelope *rq_evp, *evp;
- u_int32_t msgid;
- time_t curtm = time(NULL);
-
- log_debug("scheduler_ramqueue: insert");
-
- rq_evp = ramqueue_lookup_offload(si->evpid);
- if (rq_evp) {
- rq_msg = rq_evp->rq_msg;
- rq_batch = rq_evp->rq_batch;
- rq_host = rq_evp->rq_host;
- RB_REMOVE(offloadtree, &ramqueue.offloadtree, rq_evp);
- }
- else {
- msgid = evpid_to_msgid(si->evpid);
- rq_msg = ramqueue_lookup_message(msgid);
- if (rq_msg == NULL)
- rq_msg = ramqueue_insert_message(msgid);
-
- rq_host = ramqueue_lookup_host(si->destination);
- if (rq_host == NULL)
- rq_host = ramqueue_insert_host(si->destination);
-
- rq_batch = ramqueue_lookup_batch(rq_host, msgid);
- if (rq_batch == NULL)
- rq_batch = ramqueue_insert_batch(rq_host, msgid);
-
- rq_evp = calloc(1, sizeof (*rq_evp));
- if (rq_evp == NULL)
- fatal("calloc");
- rq_evp->evpid = si->evpid;
- rq_batch->evpcnt++;
- rq_msg->evpcnt++;
+ /* find/prepare the msgtree message in ramqueue update */
+ if ((message = tree_get(&update->messages, msgid)) == NULL) {
+ message = xcalloc(1, sizeof *message, "scheduler_insert");
+ message->msgid = msgid;
+ tree_init(&message->envelopes);
+ tree_xset(&update->messages, msgid, message);
}
- rq_evp->sched = ramqueue_next_schedule(si, curtm);
- rq_evp->rq_host = rq_host;
- rq_evp->rq_batch = rq_batch;
- rq_evp->rq_msg = rq_msg;
- RB_INSERT(evptree, &rq_msg->evptree, rq_evp);
- TAILQ_INSERT_TAIL(&rq_batch->envelope_queue, rq_evp,
- batchqueue_entry);
-
- /* sorted insert */
- TAILQ_FOREACH(evp, &ramqueue.queue, queue_entry) {
- if (evp->sched >= rq_evp->sched) {
- TAILQ_INSERT_BEFORE(evp, rq_evp, queue_entry);
- break;
- }
+ /* create envelope in ramqueue message */
+ envelope = xcalloc(1, sizeof *envelope, "scheduler_insert");
+ envelope->evpid = si->evpid;
+ envelope->type = si->type;
+ envelope->message = message;
+ envelope->batch = batch;
+ envelope->sched = scheduler_compute_schedule(si);
+ envelope->expire = si->creation + si->expire;
+
+ if (envelope->expire < envelope->sched) {
+ envelope->flags |= RQ_ENVELOPE_EXPIRED;
+ tree_xset(&update->expired, envelope->evpid, envelope);
}
- if (evp == NULL)
- TAILQ_INSERT_TAIL(&ramqueue.queue, rq_evp, queue_entry);
-
- stat_increment(STATS_RAMQUEUE_ENVELOPE);
-}
-static void
-scheduler_ramqueue_schedule(u_int64_t evpid)
-{
- struct ramqueue_envelope *rq_evp;
- struct ramqueue_message *rq_msg;
- struct ramqueue_batch *rq_batch;
-
- log_debug("scheduler_ramqueue: schedule");
+ tree_xset(&batch->envelopes, envelope->evpid, envelope);
+ tree_xset(&message->envelopes, envelope->evpid, envelope);
- rq_evp = ramqueue_lookup_envelope(evpid);
- rq_msg = rq_evp->rq_msg;
- rq_batch = rq_evp->rq_batch;
+ if (si->type == D_BOUNCE)
+ envelope->queue = &update->bounce;
+ else if (si->type == D_MDA)
+ envelope->queue = &update->mda;
+ else if (si->type == D_MTA)
+ envelope->queue = &update->mta;
+ else
+ errx(1, "bad type");
- /* remove from msg tree, batch queue and linear queue */
- RB_REMOVE(evptree, &rq_msg->evptree, rq_evp);
- TAILQ_REMOVE(&rq_batch->envelope_queue, rq_evp, batchqueue_entry);
- TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry);
+ sorted_insert(envelope->queue, envelope);
- /* insert into offload tree*/
- RB_INSERT(offloadtree, &ramqueue.offloadtree, rq_evp);
-
- /* that's one less envelope to process in the ramqueue */
- stat_decrement(STATS_RAMQUEUE_ENVELOPE);
+ if (verbose & TRACE_SCHEDULER)
+ rq_queue_dump(update, "inserted", time(NULL));
}
static void
-scheduler_ramqueue_remove(u_int64_t evpid)
+scheduler_ramqueue_commit(uint32_t msgid)
{
- struct ramqueue_batch *rq_batch;
- struct ramqueue_message *rq_msg;
- struct ramqueue_envelope *rq_evp;
- struct ramqueue_host *rq_host;
-
- log_debug("scheduler_ramqueue: remove");
-
- rq_evp = ramqueue_lookup_offload(evpid);
- if (rq_evp) {
- RB_REMOVE(offloadtree, &ramqueue.offloadtree, rq_evp);
- rq_msg = rq_evp->rq_msg;
- rq_batch = rq_evp->rq_batch;
- rq_host = rq_evp->rq_host;
- }
- else {
- rq_evp = ramqueue_lookup_envelope(evpid);
- rq_msg = rq_evp->rq_msg;
- rq_batch = rq_evp->rq_batch;
- rq_host = rq_evp->rq_host;
-
- RB_REMOVE(evptree, &rq_msg->evptree, rq_evp);
- TAILQ_REMOVE(&rq_batch->envelope_queue, rq_evp, batchqueue_entry);
- TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry);
- stat_decrement(STATS_RAMQUEUE_ENVELOPE);
- }
+ struct rq_queue *update;
+ time_t now;
- rq_batch->evpcnt--;
- rq_msg->evpcnt--;
+ update = tree_xpop(&updates, msgid);
- /* check if we are the last of a message */
- if (rq_msg->evpcnt == 0) {
- ramqueue_remove_message(rq_msg);
+ if (verbose & TRACE_SCHEDULER) {
+ now = time(NULL);
+ rq_queue_dump(update, "commit update", now);
+ rq_queue_dump(&ramqueue, "before commit", now);
}
+ rq_queue_merge(&ramqueue, update);
- /* check if we are the last of a batch */
- if (rq_batch->evpcnt == 0) {
- ramqueue_remove_batch(rq_host, rq_batch);
- }
-
- /* check if we are the last of a host */
- if (TAILQ_FIRST(&rq_host->batch_queue) == NULL) {
- ramqueue_remove_host(rq_host);
- }
+ if (verbose & TRACE_SCHEDULER)
+ rq_queue_dump(&ramqueue, "after commit", now);
- free(rq_evp);
+ free(update);
}
-static void *
-scheduler_ramqueue_host(char *host)
+static void
+scheduler_ramqueue_rollback(uint32_t msgid)
{
- struct ramqueue_iter *iter;
- struct ramqueue_host *rq_host;
-
- rq_host = ramqueue_lookup_host(host);
- if (rq_host == NULL)
- return NULL;
+ struct rq_queue *update;
+ struct rq_envelope *envelope;
- iter = calloc(1, sizeof *iter);
- if (iter == NULL)
- err(1, "calloc");
+ update = tree_xpop(&updates, msgid);
- iter->type = RAMQUEUE_ITER_HOST;
- iter->u.host = rq_host;
+ while ((envelope = TAILQ_FIRST(&update->bounce)))
+ rq_envelope_delete(update, envelope);
+ while ((envelope = TAILQ_FIRST(&update->mda)))
+ rq_envelope_delete(update, envelope);
+ while ((envelope = TAILQ_FIRST(&update->mta)))
+ rq_envelope_delete(update, envelope);
- return iter;
+ free(update);
}
-static void *
-scheduler_ramqueue_batch(u_int64_t evpid)
+static void
+scheduler_ramqueue_update(struct scheduler_info *si)
{
- struct ramqueue_iter *iter;
- struct ramqueue_envelope *rq_evp;
+ struct rq_message *message;
+ struct rq_envelope *envelope;
+ uint32_t msgid;
- rq_evp = ramqueue_lookup_envelope(evpid);
- if (rq_evp == NULL)
- return NULL;
+ msgid = evpid_to_msgid(si->evpid);
+ message = tree_xget(&ramqueue.messages, msgid);
+ envelope = tree_xget(&message->envelopes, si->evpid);
- iter = calloc(1, sizeof *iter);
- if (iter == NULL)
- err(1, "calloc");
+ /* it *should* be in-flight */
+ if (!(envelope->flags & RQ_ENVELOPE_INFLIGHT))
+ log_warnx("evp:%016" PRIx64 " not in-flight", si->evpid);
+
+ envelope->flags &= ~RQ_ENVELOPE_INFLIGHT;
+ envelope->sched = scheduler_compute_schedule(si);
+ if (envelope->expire < envelope->sched) {
+ envelope->flags |= RQ_ENVELOPE_EXPIRED;
+ tree_xset(&ramqueue.expired, envelope->evpid, envelope);
+ }
- iter->type = RAMQUEUE_ITER_BATCH;
- iter->u.batch = rq_evp->rq_batch;
+ TAILQ_REMOVE(envelope->queue, envelope, entry);
+ if (si->type == D_BOUNCE)
+ envelope->queue = &ramqueue.bounce;
+ else if (si->type == D_MDA)
+ envelope->queue = &ramqueue.mda;
+ else if (si->type == D_MTA)
+ envelope->queue = &ramqueue.mta;
- return iter;
+ sorted_insert(envelope->queue, envelope);
}
-static void *
-scheduler_ramqueue_message(u_int32_t msgid)
+static void
+scheduler_ramqueue_delete(uint64_t evpid)
{
- struct ramqueue_iter *iter;
- struct ramqueue_message *rq_msg;
-
- rq_msg = ramqueue_lookup_message(msgid);
- if (rq_msg == NULL)
- return NULL;
-
- iter = calloc(1, sizeof *iter);
- if (iter == NULL)
- err(1, "calloc");
+ struct rq_message *message;
+ struct rq_envelope *envelope;
+ uint32_t msgid;
- iter->type = RAMQUEUE_ITER_MESSAGE;
- iter->u.message = rq_msg;
+ msgid = evpid_to_msgid(evpid);
+ message = tree_xget(&ramqueue.messages, msgid);
+ envelope = tree_xget(&message->envelopes, evpid);
- return iter;
+ /* it *must* be in-flight */
+ if (!(envelope->flags & RQ_ENVELOPE_INFLIGHT))
+ errx(1, "evp:%016" PRIx64 " not in-flight", evpid);
+ rq_envelope_delete(&ramqueue, envelope);
}
-static void *
-scheduler_ramqueue_queue(void)
-{
- struct ramqueue_iter *iter;
+static int
+scheduler_ramqueue_next(int typemask, uint64_t *evpid, time_t *sched)
+{
+ struct rq_envelope *evp_mda = NULL;
+ struct rq_envelope *evp_mta = NULL;
+ struct rq_envelope *evp_bounce = NULL;
+ struct rq_envelope *envelope = NULL;
+
+ if (verbose & TRACE_SCHEDULER)
+ rq_queue_dump(&ramqueue, "next", time(NULL));
+
+ *sched = 0;
+
+ if (typemask & SCHED_REMOVE && tree_root(&ramqueue.removed, evpid, NULL))
+ return (1);
+ if (typemask & SCHED_EXPIRE && tree_root(&ramqueue.expired, evpid, NULL))
+ return (1);
+
+ /* fetch first envelope from each queue */
+ if (typemask & SCHED_BOUNCE)
+ evp_bounce = TAILQ_FIRST(&ramqueue.bounce);
+ if (typemask & SCHED_MDA)
+ evp_mda = TAILQ_FIRST(&ramqueue.mda);
+ if (typemask & SCHED_MTA)
+ evp_mta = TAILQ_FIRST(&ramqueue.mta);
+
+ /* set current envelope to either one */
+ if (evp_bounce)
+ envelope = evp_bounce;
+ else if (evp_mda)
+ envelope = evp_mda;
+ else if (evp_mta)
+ envelope = evp_mta;
+ else
+ return (0);
- iter = calloc(1, sizeof *iter);
- if (iter == NULL)
- err(1, "calloc");
+ /* figure out which one should be scheduled first */
+ if (evp_bounce && evp_bounce->sched < envelope->sched)
+ envelope = evp_bounce;
+ if (evp_mda && evp_mda->sched < envelope->sched)
+ envelope = evp_mda;
+ if (evp_mta && evp_mta->sched < envelope->sched)
+ envelope = evp_mta;
- iter->type = RAMQUEUE_ITER_QUEUE;
+ *evpid = envelope->evpid;
+ *sched = envelope->sched;
- return iter;
+ return (1);
}
static void
-scheduler_ramqueue_close(void *hdl)
-{
- free(hdl);
-}
-
-int
-scheduler_ramqueue_fetch(void *hdl, u_int64_t *evpid)
-{
- struct ramqueue_iter *iter = hdl;
- struct ramqueue_envelope *rq_evp;
- struct ramqueue_batch *rq_batch;
-
- switch (iter->type) {
- case RAMQUEUE_ITER_HOST:
- rq_batch = TAILQ_FIRST(&iter->u.host->batch_queue);
- if (rq_batch == NULL)
- break;
- rq_evp = TAILQ_FIRST(&rq_batch->envelope_queue);
- if (rq_evp == NULL)
- break;
- *evpid = rq_evp->evpid;
- return 1;
-
- case RAMQUEUE_ITER_BATCH:
- rq_evp = TAILQ_FIRST(&iter->u.batch->envelope_queue);
- if (rq_evp == NULL)
- break;
- *evpid = rq_evp->evpid;
- return 1;
-
- case RAMQUEUE_ITER_MESSAGE:
- rq_evp = RB_ROOT(&iter->u.message->evptree);
- if (rq_evp == NULL)
- break;
- *evpid = rq_evp->evpid;
- return 1;
-
- case RAMQUEUE_ITER_QUEUE:
- rq_evp = TAILQ_FIRST(&ramqueue.queue);
- if (rq_evp == NULL)
- break;
- *evpid = rq_evp->evpid;
- return 1;
+scheduler_ramqueue_batch(int typemask, time_t curr, struct scheduler_batch *ret)
+{
+ struct rq_message *message;
+ struct rq_batch *batch;
+ struct rq_envelope *envelope;
+ struct id_list *item;
+ uint64_t evpid;
+ void *i;
+ int type;
+ time_t sched;
+
+ ret->evpids = NULL;
+
+ if (!scheduler_ramqueue_next(typemask, &evpid, &sched)) {
+ ret->type = SCHED_NONE;
+ return;
}
- return 0;
-}
-
-static int
-scheduler_ramqueue_force(u_int64_t id)
-{
- struct ramqueue_envelope *rq_evp;
- struct ramqueue_message *rq_msg;
- int ret;
-
- /* schedule *all* */
- if (id == 0) {
- ret = 0;
- TAILQ_FOREACH(rq_evp, &ramqueue.queue, queue_entry) {
- rq_evp->sched = 0;
- ret++;
+ if (tree_get(&ramqueue.removed, evpid)) {
+ ret->type = SCHED_REMOVE;
+ while (tree_poproot(&ramqueue.removed, &evpid, NULL)) {
+ item = xmalloc(sizeof *item, "schedule_batch");
+ item->id = evpid;
+ item->next = ret->evpids;
+ ret->evpids = item;
}
- return ret;
+ return;
}
- /* scheduling by evpid */
- if (id > 0xffffffffL) {
- rq_evp = ramqueue_lookup_envelope(id);
- if (rq_evp == NULL)
- return 0;
+ message = tree_xget(&ramqueue.messages, evpid_to_msgid(evpid));
+ envelope = tree_xget(&message->envelopes, evpid);
+
+ /* if the envelope has expired, return the expired list */
+ if (envelope->flags & RQ_ENVELOPE_EXPIRED) {
+ ret->type = SCHED_EXPIRE;
+ while (tree_poproot(&ramqueue.expired, &evpid, (void**)&envelope)) {
+ TAILQ_REMOVE(envelope->queue, envelope, entry);
+ TAILQ_INSERT_TAIL(&ramqueue.inflight, envelope, entry);
+ envelope->flags |= RQ_ENVELOPE_INFLIGHT;
+ item = xmalloc(sizeof *item, "schedule_batch");
+ item->id = evpid;
+ item->next = ret->evpids;
+ ret->evpids = item;
+ }
+ return;
+ }
- rq_evp->sched = 0;
- TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry);
- TAILQ_INSERT_HEAD(&ramqueue.queue, rq_evp, queue_entry);
- return 1;
+ if (sched > curr) {
+ ret->type = SCHED_DELAY;
+ ret->delay = sched - curr;
+ return;
}
- rq_msg = ramqueue_lookup_message(id);
- if (rq_msg == NULL)
- return 0;
-
- /* scheduling by msgid */
- ret = 0;
- RB_FOREACH(rq_evp, evptree, &rq_msg->evptree) {
- rq_evp->sched = 0;
- TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry);
- TAILQ_INSERT_HEAD(&ramqueue.queue, rq_evp, queue_entry);
- ret++;
+ batch = envelope->batch;
+ type = envelope->type;
+ if (type == D_BOUNCE)
+ ret->type = SCHED_BOUNCE;
+ else if (type == D_MDA)
+ ret->type = SCHED_MDA;
+ else if (type == D_MTA)
+ ret->type = SCHED_MTA;
+
+ i = NULL;
+ while((tree_iter(&batch->envelopes, &i, &evpid, (void*)&envelope))) {
+ if (envelope->type != type)
+ continue;
+ if (envelope->sched > curr)
+ continue;
+ if (envelope->flags & RQ_ENVELOPE_INFLIGHT)
+ continue;
+ if (envelope->flags & RQ_ENVELOPE_EXPIRED)
+ continue;
+ TAILQ_REMOVE(envelope->queue, envelope, entry);
+ TAILQ_INSERT_TAIL(&ramqueue.inflight, envelope, entry);
+ envelope->queue = &ramqueue.inflight;
+ envelope->flags |= RQ_ENVELOPE_INFLIGHT;
+ item = xmalloc(sizeof *item, "schedule_batch");
+ item->id = evpid;
+ item->next = ret->evpids;
+ ret->evpids = item;
}
- return ret;
}
-static struct ramqueue_host *
-ramqueue_lookup_host(char *host)
-{
- struct ramqueue_host hostkey;
- strlcpy(hostkey.hostname, host, sizeof(hostkey.hostname));
- return RB_FIND(hosttree, &ramqueue.hosttree, &hostkey);
-}
-
-static struct ramqueue_message *
-ramqueue_lookup_message(u_int32_t msgid)
+static void
+scheduler_ramqueue_schedule(uint64_t evpid)
{
- struct ramqueue_message msgkey;
+ struct rq_message *message;
+ struct rq_envelope *envelope;
+ uint32_t msgid;
- msgkey.msgid = msgid;
- return RB_FIND(msgtree, &ramqueue.msgtree, &msgkey);
-}
+ msgid = evpid_to_msgid(evpid);
+ if ((message = tree_get(&ramqueue.messages, msgid)) == NULL)
+ return;
+ if ((envelope = tree_xget(&message->envelopes, evpid)) == NULL)
+ return;
+ if (envelope->flags & RQ_ENVELOPE_INFLIGHT)
+ return;
-static struct ramqueue_envelope *
-ramqueue_lookup_offload(u_int64_t evpid)
-{
- struct ramqueue_envelope evpkey;
+ envelope->sched = time(NULL);
+ TAILQ_REMOVE(envelope->queue, envelope, entry);
+ sorted_insert(envelope->queue, envelope);
+}
- evpkey.evpid = evpid;
- return RB_FIND(offloadtree, &ramqueue.offloadtree, &evpkey);
+static void
+scheduler_ramqueue_remove(uint64_t evpid)
+{
+ struct rq_message *message;
+ struct rq_envelope *envelope;
+ uint32_t msgid;
+ struct evplist rmlist;
+ void *i;
+
+ if (evpid > 0xffffffff) {
+ msgid = evpid_to_msgid(evpid);
+ if ((message = tree_get(&ramqueue.messages, msgid)) == NULL)
+ return;
+ if ((envelope = tree_xget(&message->envelopes, evpid)) == NULL)
+ return;
+ if (envelope->flags & RQ_ENVELOPE_INFLIGHT)
+ return;
+ rq_envelope_delete(&ramqueue, envelope);
+ tree_xset(&ramqueue.removed, evpid, &ramqueue);
+ }
+ else {
+ msgid = evpid;
+ if ((message = tree_get(&ramqueue.messages, msgid)) == NULL)
+ return;
+
+ TAILQ_INIT(&rmlist);
+ i = NULL;
+ while (tree_iter(&message->envelopes, &i, &evpid,
+ (void*)(&envelope))) {
+ if (envelope->flags & RQ_ENVELOPE_INFLIGHT)
+ continue;
+ tree_xset(&ramqueue.removed, evpid, &ramqueue);
+ TAILQ_REMOVE(envelope->queue, envelope, entry);
+ envelope->queue = &rmlist;
+ TAILQ_INSERT_HEAD(&rmlist, envelope, entry);
+ }
+ while((envelope = TAILQ_FIRST(&rmlist)))
+ rq_envelope_delete(&ramqueue, envelope);
+ }
}
-static struct ramqueue_envelope *
-ramqueue_lookup_envelope(u_int64_t evpid)
+static void
+sorted_insert(struct evplist *list, struct rq_envelope *evp)
{
- struct ramqueue_message *rq_msg;
- struct ramqueue_envelope evpkey;
+ struct rq_envelope *item;
- rq_msg = ramqueue_lookup_message(evpid_to_msgid(evpid));
- if (rq_msg == NULL)
- return NULL;
-
- evpkey.evpid = evpid;
- return RB_FIND(evptree, &rq_msg->evptree, &evpkey);
+ TAILQ_FOREACH(item, list, entry) {
+ if (evp->sched < item->sched) {
+ TAILQ_INSERT_BEFORE(item, evp, entry);
+ return;
+ }
+ }
+ TAILQ_INSERT_TAIL(list, evp, entry);
}
-static struct ramqueue_batch *
-ramqueue_lookup_batch(struct ramqueue_host *rq_host, u_int32_t msgid)
+static void
+sorted_merge(struct evplist *list, struct evplist *from)
{
- struct ramqueue_batch *rq_batch;
+ struct rq_envelope *e;
- TAILQ_FOREACH(rq_batch, &rq_host->batch_queue, batch_entry) {
- if (rq_batch->msgid == msgid)
- return rq_batch;
+ /* XXX this is O(not good enough) */
+ while ((e = TAILQ_LAST(from, evplist))) {
+ TAILQ_REMOVE(from, e, entry);
+ sorted_insert(list, e);
+ e->queue = list;
}
-
- return NULL;
}
-static int
-ramqueue_expire(struct envelope *envelope)
+static void
+rq_queue_init(struct rq_queue *rq)
{
- struct envelope bounce;
- struct scheduler_info si;
- time_t curtm;
-
- curtm = time(NULL);
- if (curtm - envelope->creation >= envelope->expire) {
- envelope_set_errormsg(envelope,
- "message expired after sitting in queue for %d days",
- envelope->expire / 60 / 60 / 24);
- bounce_record_message(envelope, &bounce);
-
- scheduler_info(&si, &bounce);
- scheduler_ramqueue_insert(&si);
-
- log_debug("#### %s: queue_envelope_delete: %016" PRIx64,
- __func__, envelope->id);
- queue_envelope_delete(envelope);
- return 1;
- }
- return 0;
+ bzero(rq, sizeof *rq);
+
+ tree_init(&rq->messages);
+ tree_init(&rq->expired);
+ tree_init(&rq->removed);
+ SPLAY_INIT(&rq->hosts);
+ TAILQ_INIT(&rq->mda);
+ TAILQ_INIT(&rq->mta);
+ TAILQ_INIT(&rq->bounce);
+ TAILQ_INIT(&rq->inflight);
}
-static time_t
-ramqueue_next_schedule(struct scheduler_info *si, time_t curtm)
-{
- time_t delay;
-
- if (si->lasttry == 0)
- return curtm;
-
- delay = SMTPD_QUEUE_MAXINTERVAL;
-
- if (si->type == D_MDA ||
- si->type == D_BOUNCE) {
- if (si->retry < 5)
- return curtm;
-
- if (si->retry < 15)
- delay = (si->retry * 60) + arc4random_uniform(60);
+static void
+rq_queue_merge(struct rq_queue *rq, struct rq_queue *update)
+{
+ struct rq_message *message, *tomessage;
+ struct rq_batch *batch, *tobatch;
+ struct rq_host *host, *tohost;
+ struct rq_envelope *envelope;
+ uint64_t id;
+ void *i;
+
+ /* merge host tree */
+ while ((host = SPLAY_ROOT(&update->hosts))) {
+ SPLAY_REMOVE(hosttree, &update->hosts, host);
+ tohost = rq_host_lookup(&rq->hosts, host->host);
+ if (tohost == NULL)
+ tohost = rq_host_create(&rq->hosts, host->host);
+ /* merge batches for current host */
+ while (tree_poproot(&host->batches, &id, (void*)&batch)) {
+ tobatch = tree_get(&tohost->batches, batch->msgid);
+ if (tobatch == NULL) {
+ /* batch does not exist. re-use structure */
+ batch->host = tohost;
+ tree_xset(&tohost->batches, batch->msgid, batch);
+ continue;
+ }
+ /* need to re-link all envelopes before merging them */
+ i = NULL;
+ while((tree_iter(&batch->envelopes, &i, &id,
+ (void*)&envelope)))
+ envelope->batch = tobatch;
+ tree_merge(&tobatch->envelopes, &batch->envelopes);
+ free(batch);
+ }
+ free(host);
}
- if (si->type == D_MTA) {
- if (si->retry < 3)
- delay = SMTPD_QUEUE_INTERVAL;
- else if (si->retry <= 7) {
- delay = SMTPD_QUEUE_INTERVAL * (1 << (si->retry - 3));
- if (delay > SMTPD_QUEUE_MAXINTERVAL)
- delay = SMTPD_QUEUE_MAXINTERVAL;
+ while (tree_poproot(&update->messages, &id, (void*)&message)) {
+ if ((tomessage = tree_get(&rq->messages, id)) == NULL) {
+ /* message does not exist. re-use structure */
+ tree_xset(&rq->messages, id, message);
+ continue;
}
+ /* need to re-link all envelopes before merging them */
+ i = NULL;
+ while((tree_iter(&message->envelopes, &i, &id,
+ (void*)&envelope)))
+ envelope->message = tomessage;
+ tree_merge(&tomessage->envelopes, &message->envelopes);
+ free(message);
}
- if (curtm >= si->lasttry + delay)
- return curtm;
+ sorted_merge(&rq->bounce, &update->bounce);
+ sorted_merge(&rq->mda, &update->mda);
+ sorted_merge(&rq->mta, &update->mta);
- return curtm + delay;
+ tree_merge(&rq->expired, &update->expired);
+ tree_merge(&rq->removed, &update->removed);
}
-static struct ramqueue_message *
-ramqueue_insert_message(u_int32_t msgid)
+static void
+rq_envelope_delete(struct rq_queue *rq, struct rq_envelope *envelope)
{
- struct ramqueue_message *rq_msg;
+ struct rq_message *message;
+ struct rq_batch *batch;
+ struct rq_host *host;
- rq_msg = calloc(1, sizeof (*rq_msg));
- if (rq_msg == NULL)
- fatal("calloc");
- rq_msg->msgid = msgid;
- RB_INSERT(msgtree, &ramqueue.msgtree, rq_msg);
- RB_INIT(&rq_msg->evptree);
- stat_increment(STATS_RAMQUEUE_MESSAGE);
+ if (envelope->flags & RQ_ENVELOPE_EXPIRED)
+ tree_pop(&rq->expired, envelope->evpid);
- return rq_msg;
-}
+ TAILQ_REMOVE(envelope->queue, envelope, entry);
+ batch = envelope->batch;
+ message = envelope->message;
+ host = batch->host;
-static struct ramqueue_host *
-ramqueue_insert_host(char *host)
-{
- struct ramqueue_host *rq_host;
-
- rq_host = calloc(1, sizeof (*rq_host));
- if (rq_host == NULL)
- fatal("calloc");
- strlcpy(rq_host->hostname, host, sizeof(rq_host->hostname));
- TAILQ_INIT(&rq_host->batch_queue);
- RB_INSERT(hosttree, &ramqueue.hosttree, rq_host);
- stat_increment(STATS_RAMQUEUE_HOST);
+ tree_xpop(&message->envelopes, envelope->evpid);
+ if (tree_empty(&message->envelopes)) {
+ tree_xpop(&rq->messages, message->msgid);
+ free(message);
+ }
- return rq_host;
+ tree_xpop(&batch->envelopes, envelope->evpid);
+ if (tree_empty(&batch->envelopes)) {
+ tree_xpop(&host->batches, message->msgid);
+ if (tree_empty(&host->batches)) {
+ SPLAY_REMOVE(hosttree, &rq->hosts, host);
+ free(host);
+ }
+ free(batch);
+ }
+ free(envelope);
}
-static struct ramqueue_batch *
-ramqueue_insert_batch(struct ramqueue_host *rq_host, u_int32_t msgid)
+static const char *
+rq_envelope_to_text(struct rq_envelope *e, time_t tref)
{
- struct ramqueue_batch *rq_batch;
+ static char buf[256];
+ char t[64];
- rq_batch = calloc(1, sizeof (*rq_batch));
- if (rq_batch == NULL)
- fatal("calloc");
- rq_batch->b_id = generate_uid();
- rq_batch->rq_host = rq_host;
- rq_batch->msgid = msgid;
+ snprintf(buf, sizeof buf, "evp:%016" PRIx64 " [", e->evpid);
- TAILQ_INIT(&rq_batch->envelope_queue);
- TAILQ_INSERT_TAIL(&rq_host->batch_queue, rq_batch, batch_entry);
+ if (e->type == D_BOUNCE)
+ strlcat(buf, "bounce", sizeof buf);
+ else if (e->type == D_MDA)
+ strlcat(buf, "mda", sizeof buf);
+ else if (e->type == D_MTA)
+ strlcat(buf, "mta", sizeof buf);
- stat_increment(STATS_RAMQUEUE_BATCH);
+ snprintf(t, sizeof t, ",sched=%s", duration_to_text(e->sched - tref));
+ strlcat(buf, t, sizeof buf);
+ snprintf(t, sizeof t, ",exp=%s", duration_to_text(e->expire - tref));
+ strlcat(buf, t, sizeof buf);
- return rq_batch;
-}
+ if (e->flags & RQ_ENVELOPE_EXPIRED)
+ strlcat(buf, ",expired", sizeof buf);
+ if (e->flags & RQ_ENVELOPE_INFLIGHT)
+ strlcat(buf, ",in-flight", sizeof buf);
-static void
-ramqueue_remove_host(struct ramqueue_host *rq_host)
-{
- RB_REMOVE(hosttree, &ramqueue.hosttree, rq_host);
- free(rq_host);
- stat_decrement(STATS_RAMQUEUE_HOST);
+ strlcat(buf, "]", sizeof buf);
+
+ return (buf);
}
static void
-ramqueue_remove_message(struct ramqueue_message *rq_msg)
-{
- RB_REMOVE(msgtree, &ramqueue.msgtree, rq_msg);
- free(rq_msg);
- stat_decrement(STATS_RAMQUEUE_MESSAGE);
-}
+rq_queue_dump(struct rq_queue *rq, const char * name, time_t tref)
+{
+ struct rq_host *host;
+ struct rq_batch *batch;
+ struct rq_message *message;
+ struct rq_envelope *envelope;
+ void *i, *j;
+ uint64_t id;
+
+ log_debug("/--- ramqueue: %s", name);
+ SPLAY_FOREACH(host, hosttree, &rq->hosts) {
+ log_debug("| host:%s", host->host);
+ i = NULL;
+ while((tree_iter(&host->batches, &i, &id, (void*)&batch))) {
+ log_debug("| batch:%08" PRIx32, batch->msgid);
+ j = NULL;
+ while((tree_iter(&batch->envelopes, &j, &id,
+ (void*)&envelope)))
+ log_debug("| %s",
+ rq_envelope_to_text(envelope, tref));
+ }
+ }
+ i = NULL;
+ while((tree_iter(&rq->messages, &i, &id, (void*)&message))) {
+ log_debug("| msg:%08" PRIx32, message->msgid);
+ j = NULL;
+ while((tree_iter(&message->envelopes, &j, &id,
+ (void*)&envelope)))
+ log_debug("| %s", rq_envelope_to_text(envelope, tref));
+ }
-static void
-ramqueue_remove_batch(struct ramqueue_host *rq_host,
- struct ramqueue_batch *rq_batch)
-{
- TAILQ_REMOVE(&rq_host->batch_queue, rq_batch, batch_entry);
- free(rq_batch);
- stat_decrement(STATS_RAMQUEUE_BATCH);
+ log_debug("| bounces:");
+ TAILQ_FOREACH(envelope, &rq->bounce, entry)
+ log_debug("| %s", rq_envelope_to_text(envelope, tref));
+ log_debug("| mda:");
+ TAILQ_FOREACH(envelope, &rq->mda, entry)
+ log_debug("| %s", rq_envelope_to_text(envelope, tref));
+ log_debug("| mta:");
+ TAILQ_FOREACH(envelope, &rq->mta, entry)
+ log_debug("| %s", rq_envelope_to_text(envelope, tref));
+ log_debug("| in-flight:");
+ TAILQ_FOREACH(envelope, &rq->inflight, entry)
+ log_debug("| %s", rq_envelope_to_text(envelope, tref));
+ log_debug("\\---");
}
static int
-ramqueue_host_cmp(struct ramqueue_host *h1, struct ramqueue_host *h2)
+rq_host_cmp(struct rq_host *a, struct rq_host *b)
{
- return strcmp(h1->hostname, h2->hostname);
+ return (strcmp(a->host, b->host));
}
-
-static int
-ramqueue_msg_cmp(struct ramqueue_message *m1, struct ramqueue_message *m2)
+static struct rq_host *
+rq_host_lookup(struct hosttree *host_tree, char *host)
{
- return (m1->msgid < m2->msgid ? -1 : m1->msgid > m2->msgid);
+ struct rq_host key;
+
+ strlcpy(key.host, host, sizeof key.host);
+ return (SPLAY_FIND(hosttree, host_tree, &key));
}
-static int
-ramqueue_evp_cmp(struct ramqueue_envelope *e1, struct ramqueue_envelope *e2)
+static struct rq_host *
+rq_host_create(struct hosttree *host_tree, char *host)
{
- return (e1->evpid < e2->evpid ? -1 : e1->evpid > e2->evpid);
-}
+ struct rq_host *rq_host;
-RB_GENERATE(hosttree, ramqueue_host, hosttree_entry, ramqueue_host_cmp);
-RB_GENERATE(msgtree, ramqueue_message, msgtree_entry, ramqueue_msg_cmp);
-RB_GENERATE(evptree, ramqueue_envelope, evptree_entry, ramqueue_evp_cmp);
-RB_GENERATE(offloadtree, ramqueue_envelope, evptree_entry, ramqueue_evp_cmp);
+ rq_host = xcalloc(1, sizeof *rq_host, "rq_host_create");
+ tree_init(&rq_host->batches);
+ strlcpy(rq_host->host, host, sizeof rq_host->host);
+ SPLAY_INSERT(hosttree, host_tree, rq_host);
+
+ return (rq_host);
+}
+SPLAY_GENERATE(hosttree, rq_host, hosttree_entry, rq_host_cmp);
diff --git a/usr.sbin/smtpd/smtpd.h b/usr.sbin/smtpd/smtpd.h
index b98cf72f6f9..7941fd73bba 100644
--- a/usr.sbin/smtpd/smtpd.h
+++ b/usr.sbin/smtpd/smtpd.h
@@ -1,8 +1,9 @@
-/* $OpenBSD: smtpd.h,v 1.314 2012/08/07 21:47:57 eric Exp $ */
+/* $OpenBSD: smtpd.h,v 1.315 2012/08/08 08:50:42 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
* Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
+ * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
@@ -916,8 +917,8 @@ struct delivery_backend {
};
struct scheduler_info {
- u_int64_t evpid;
- char destination[MAXHOSTNAMELEN];
+ u_int64_t evpid;
+ char destination[MAXHOSTNAMELEN];
enum delivery_type type;
time_t creation;
@@ -926,30 +927,41 @@ struct scheduler_info {
u_int8_t retry;
};
+struct id_list {
+ struct id_list *next;
+ uint64_t id;
+};
+
+#define SCHED_NONE 0x00
+#define SCHED_DELAY 0x01
+#define SCHED_REMOVE 0x02
+#define SCHED_EXPIRE 0x04
+#define SCHED_BOUNCE 0x08
+#define SCHED_MDA 0x10
+#define SCHED_MTA 0x20
+
+struct scheduler_batch {
+ int type;
+ time_t delay;
+ struct id_list *evpids;
+};
+
struct scheduler_backend {
void (*init)(void);
- int (*setup)(void);
-
- int (*next)(u_int64_t *, time_t *);
void (*insert)(struct scheduler_info *);
- void (*schedule)(u_int64_t);
- void (*remove)(u_int64_t);
+ void (*commit)(u_int32_t);
+ void (*rollback)(u_int32_t);
- void *(*host)(char *);
- void *(*message)(u_int32_t);
- void *(*batch)(u_int64_t);
- void *(*queue)(void);
- void (*close)(void *);
+ void (*update)(struct scheduler_info *);
+ void (*delete)(u_int64_t);
- int (*fetch)(void *, u_int64_t *);
- int (*force)(u_int64_t);
+ void (*batch)(int, time_t, struct scheduler_batch *);
- void (*display)(void); /* may be NULL */
+ void (*schedule)(u_int64_t);
+ void (*remove)(u_int64_t);
};
-
-
extern struct smtpd *env;
extern void (*imsg_callback)(struct imsgev *, struct imsg *);
@@ -1100,13 +1112,11 @@ void qwalk_close(void *);
/* scheduler.c */
pid_t scheduler(void);
-void message_reset_flags(struct envelope *);
-
-/* scheduler.c */
+/* scheduler_bakend.c */
struct scheduler_backend *scheduler_backend_lookup(const char *);
void scheduler_info(struct scheduler_info *, struct envelope *);
-
+time_t scheduler_compute_schedule(struct scheduler_info *);
/* smtp.c */
pid_t smtp(void);
@@ -1199,6 +1209,7 @@ int valid_localpart(const char *);
int valid_domainpart(const char *);
char *ss_to_text(struct sockaddr_storage *);
char *time_to_text(time_t);
+char *duration_to_text(time_t);
int secure_file(int, char *, char *, uid_t, int);
int lowercase(char *, char *, size_t);
void xlowercase(char *, char *, size_t);
diff --git a/usr.sbin/smtpd/util.c b/usr.sbin/smtpd/util.c
index 7000a366c1d..8fa0beff0c4 100644
--- a/usr.sbin/smtpd/util.c
+++ b/usr.sbin/smtpd/util.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: util.c,v 1.68 2012/08/07 21:47:58 eric Exp $ */
+/* $OpenBSD: util.c,v 1.69 2012/08/08 08:50:42 eric Exp $ */
/*
* Copyright (c) 2000,2001 Markus Friedl. All rights reserved.
@@ -538,6 +538,51 @@ time_to_text(time_t when)
return buf;
}
+char *
+duration_to_text(time_t t)
+{
+ static char dst[64];
+ char buf[64];
+ int d, h, m, s;
+
+ if (t == 0) {
+ strlcpy(dst, "0s", sizeof dst);
+ return (dst);
+ }
+
+ dst[0] = '\0';
+ if (t < 0) {
+ strlcpy(dst, "-", sizeof dst);
+ t = -t;
+ }
+
+ s = t % 60;
+ t /= 60;
+ m = t % 60;
+ t /= 60;
+ h = t % 24;
+ d = t / 24;
+
+ if (d) {
+ snprintf(buf, sizeof buf, "%id", d);
+ strlcat(dst, buf, sizeof dst);
+ }
+ if (h) {
+ snprintf(buf, sizeof buf, "%ih", h);
+ strlcat(dst, buf, sizeof dst);
+ }
+ if (m) {
+ snprintf(buf, sizeof buf, "%im", m);
+ strlcat(dst, buf, sizeof dst);
+ }
+ if (s) {
+ snprintf(buf, sizeof buf, "%is", s);
+ strlcat(dst, buf, sizeof dst);
+ }
+
+ return (dst);
+}
+
int
text_to_netaddr(struct netaddr *netaddr, char *s)
{