From 4fb538ff7fe692fc3494a906f82178b14bf48bf1 Mon Sep 17 00:00:00 2001 From: Eric Faurot Date: Wed, 8 Aug 2012 08:50:43 +0000 Subject: Improve the scheduler backend API. New envelopes are pushed into the scheduler through the insert() commit() rollback() transactional interface functions. Worklists are pulled from the scheduler through a single batch() interface function, which returns a list of envelope ids and the type of processing. Envelopes returned in this batch are said to be "in-flight", as opposed to "pending". They are supposed to be processed in some way, and either updated() or deleted() at some point. The schedule()/remove() functions are used to alter the internal state of "pending" envelopes to make them schedulable. The enve- lopes will be part of a worklist on the next call to batch(). Rewrite the scheduler_ramqueue backend. The initial queue loading in now done by the queue. ok gilles@ --- usr.sbin/smtpd/bounce.c | 8 +- usr.sbin/smtpd/queue.c | 58 +- usr.sbin/smtpd/scheduler.c | 454 +++++-------- usr.sbin/smtpd/scheduler_backend.c | 23 +- usr.sbin/smtpd/scheduler_ramqueue.c | 1199 ++++++++++++++++------------------- usr.sbin/smtpd/smtpd.h | 55 +- usr.sbin/smtpd/util.c | 47 +- 7 files changed, 896 insertions(+), 948 deletions(-) diff --git a/usr.sbin/smtpd/bounce.c b/usr.sbin/smtpd/bounce.c index c2147d12297..f31b322ef29 100644 --- a/usr.sbin/smtpd/bounce.c +++ b/usr.sbin/smtpd/bounce.c @@ -1,4 +1,4 @@ -/* $OpenBSD: bounce.c,v 1.42 2012/07/09 09:57:53 gilles Exp $ */ +/* $OpenBSD: bounce.c,v 1.43 2012/08/08 08:50:42 eric Exp $ */ /* * Copyright (c) 2009 Gilles Chehade @@ -60,6 +60,10 @@ struct bounce { struct io io; }; +/* XXX remove later */ +void scheduler_envelope_update(struct envelope *); +void scheduler_envelope_delete(struct envelope *); + static void bounce_send(struct bounce *, const char *, ...); static int bounce_next(struct bounce *); static void bounce_status(struct bounce *, const char *, ...); @@ -210,10 +214,12 @@ bounce_status(struct bounce *bounce, const char *fmt, ...) log_debug("#### %s: queue_envelope_delete: %016" PRIx64, __func__, bounce->evp.id); queue_envelope_delete(&bounce->evp); + scheduler_envelope_delete(&bounce->evp); } else { bounce->evp.retry++; envelope_set_errormsg(&bounce->evp, "%s", status); queue_envelope_update(&bounce->evp); + scheduler_envelope_update(&bounce->evp); } bounce->evp.id = 0; free(status); diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c index d0e9549607b..c8c2d31403c 100644 --- a/usr.sbin/smtpd/queue.c +++ b/usr.sbin/smtpd/queue.c @@ -1,8 +1,9 @@ -/* $OpenBSD: queue.c,v 1.121 2012/07/09 09:57:53 gilles Exp $ */ +/* $OpenBSD: queue.c,v 1.122 2012/08/08 08:50:42 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade * Copyright (c) 2008 Pierre-Yves Ritschard + * Copyright (c) 2012 Eric Faurot * * Permission to use, copy, modify, and distribute this software for any * purpose with or without fee is hereby granted, provided that the above @@ -39,6 +40,7 @@ #include "log.h" static void queue_imsg(struct imsgev *, struct imsg *); +static void queue_timeout(int, short, void *); static void queue_pass_to_scheduler(struct imsgev *, struct imsg *); static void queue_shutdown(void); static void queue_sig_handler(int, short, void *); @@ -224,7 +226,8 @@ queue(void) { pid_t pid; struct passwd *pw; - + struct timeval tv; + struct event ev_qload; struct event ev_sigint; struct event ev_sigterm; @@ -286,6 +289,12 @@ queue(void) config_pipes(peers, nitems(peers)); config_peers(peers, nitems(peers)); + /* setup queue loading task */ + evtimer_set(&ev_qload, queue_timeout, &ev_qload); + tv.tv_sec = 0; + tv.tv_usec = 10; + evtimer_add(&ev_qload, &tv); + if (event_dispatch() < 0) fatal("event_dispatch"); queue_shutdown(); @@ -293,6 +302,51 @@ queue(void) return (0); } +void +queue_timeout(int fd, short event, void *p) +{ + static struct qwalk *q = NULL; + struct event *ev = p; + static uint64_t last_evpid = 0; + struct envelope envelope; + struct timeval tv; + uint64_t evpid; + + if (q == NULL) { + log_info("queue: loading queue into scheduler"); + q = qwalk_new(0); + } + + while (qwalk(q, &evpid)) { + if (! queue_envelope_load(evpid, &envelope)) + continue; + + if (evpid_to_msgid(evpid) != evpid_to_msgid(last_evpid) && + last_evpid != 0) { + envelope.id = last_evpid; + imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], + IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &envelope, + sizeof envelope); + } + + last_evpid = evpid; + tv.tv_sec = 0; + tv.tv_usec = 0; + evtimer_add(ev, &tv); + return; + } + + if (last_evpid) { + envelope.id = last_evpid; + imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], + IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &envelope, + sizeof envelope); + } + + log_info("queue: done loading queue into scheduler"); + qwalk_close(q); +} + void queue_submit_envelope(struct envelope *ep) { diff --git a/usr.sbin/smtpd/scheduler.c b/usr.sbin/smtpd/scheduler.c index 490e4db546e..9efaf8029a4 100644 --- a/usr.sbin/smtpd/scheduler.c +++ b/usr.sbin/smtpd/scheduler.c @@ -1,9 +1,10 @@ -/* $OpenBSD: scheduler.c,v 1.7 2012/07/18 22:04:49 eric Exp $ */ +/* $OpenBSD: scheduler.c,v 1.8 2012/08/08 08:50:42 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade * Copyright (c) 2008 Pierre-Yves Ritschard * Copyright (c) 2008-2009 Jacek Masiulaniec + * Copyright (c) 2012 Eric Faurot * * Permission to use, copy, modify, and distribute this software for any * purpose with or without fee is hereby granted, provided that the above @@ -47,17 +48,18 @@ static void scheduler_imsg(struct imsgev *, struct imsg *); static void scheduler_shutdown(void); static void scheduler_sig_handler(int, short, void *); -static void scheduler_setup_events(void); static void scheduler_reset_events(void); -static void scheduler_disable_events(void); static void scheduler_timeout(int, short, void *); -static void scheduler_remove(u_int64_t); -static void scheduler_remove_envelope(u_int64_t); -static int scheduler_process_envelope(u_int64_t); -static int scheduler_process_batch(enum delivery_type, u_int64_t); -static int scheduler_check_loop(struct envelope *); +static void scheduler_process_remove(struct scheduler_batch *); +static void scheduler_process_expire(struct scheduler_batch *); +static void scheduler_process_bounce(struct scheduler_batch *); +static void scheduler_process_mda(struct scheduler_batch *); +static void scheduler_process_mta(struct scheduler_batch *); static int scheduler_load_message(u_int32_t); +void scheduler_envelope_update(struct envelope *); +void scheduler_envelope_delete(struct envelope *); + static struct scheduler_backend *backend = NULL; extern const char *backend_scheduler; @@ -84,8 +86,9 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg) e = imsg->data; log_trace(TRACE_SCHEDULER, "scheduler: IMSG_QUEUE_DELIVERY_OK: %016"PRIx64, e->id); - backend->remove(e->id); + backend->delete(e->id); queue_envelope_delete(e); + scheduler_reset_events(); return; case IMSG_QUEUE_DELIVERY_TEMPFAIL: @@ -96,7 +99,7 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg) e->retry++; queue_envelope_update(e); scheduler_info(&si, e); - backend->insert(&si); + backend->update(&si); scheduler_reset_events(); return; @@ -109,10 +112,11 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg) bounce_record_message(e, &bounce); scheduler_info(&si, &bounce); backend->insert(&si); - scheduler_reset_events(); + backend->commit(evpid_to_msgid(bounce.id)); } - backend->remove(e->id); + backend->delete(e->id); queue_envelope_delete(e); + scheduler_reset_events(); return; case IMSG_MDA_SESS_NEW: @@ -138,7 +142,7 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg) if (imsg->fd < 0 || !bounce_session(imsg->fd, e)) { queue_envelope_update(e); scheduler_info(&si, e); - backend->insert(&si); + backend->update(&si); scheduler_reset_events(); return; } @@ -175,24 +179,23 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg) log_trace(TRACE_SCHEDULER, "scheduler: IMSG_SCHEDULER_SCHEDULE: %016"PRIx64, *(u_int64_t *)imsg->data); - backend->force(*(u_int64_t *)imsg->data); - scheduler_reset_events(); + backend->schedule(*(u_int64_t *)imsg->data); + scheduler_reset_events(); return; case IMSG_SCHEDULER_REMOVE: log_trace(TRACE_SCHEDULER, "scheduler: IMSG_SCHEDULER_REMOVE: %016"PRIx64, *(u_int64_t *)imsg->data); - scheduler_remove(*(u_int64_t *)imsg->data); + backend->remove(*(u_int64_t *)imsg->data); scheduler_reset_events(); return; - } errx(1, "scheduler_imsg: unexpected %s imsg", imsg_to_str(imsg->hdr.type)); } -void +static void scheduler_sig_handler(int sig, short event, void *p) { switch (sig) { @@ -205,25 +208,14 @@ scheduler_sig_handler(int sig, short event, void *p) } } -void +static void scheduler_shutdown(void) { log_info("scheduler handler exiting"); _exit(0); } -void -scheduler_setup_events(void) -{ - struct timeval tv; - - evtimer_set(&env->sc_ev, scheduler_timeout, NULL); - tv.tv_sec = 0; - tv.tv_usec = 10; - evtimer_add(&env->sc_ev, &tv); -} - -void +static void scheduler_reset_events(void) { struct timeval tv; @@ -233,12 +225,6 @@ scheduler_reset_events(void) evtimer_add(&env->sc_ev, &tv); } -void -scheduler_disable_events(void) -{ - evtimer_del(&env->sc_ev); -} - pid_t scheduler(void) { @@ -304,200 +290,198 @@ scheduler(void) config_pipes(peers, nitems(peers)); config_peers(peers, nitems(peers)); - scheduler_setup_events(); + evtimer_set(&env->sc_ev, scheduler_timeout, NULL); + scheduler_reset_events(); event_dispatch(); - scheduler_disable_events(); scheduler_shutdown(); return (0); } -void +static void scheduler_timeout(int fd, short event, void *p) { - time_t nsched; - time_t curtm; - u_int64_t evpid; - static int setup = 0; - int delay = 0; - struct timeval tv; - - log_trace(TRACE_SCHEDULER, "scheduler: entering scheduler_timeout"); - - /* if we're not done setting up the scheduler, do it some more */ - if (! setup) - setup = backend->setup(); - - /* we don't have a schedulable envelope ... sleep */ - if (! backend->next(&evpid, &nsched)) - goto scheduler_sleep; - - /* is the envelope schedulable right away ? */ - curtm = time(NULL); - if (nsched <= curtm) { - /* yup */ - scheduler_process_envelope(evpid); - } - else { - /* nope, so we can either keep the timeout delay to 0 if we - * are not done setting up the scheduler, or sleep until it - * is time to schedule that envelope otherwise. - */ - if (setup) - delay = nsched - curtm; - } + struct timeval tv; + struct scheduler_batch batch; + int typemask; - if (delay) - log_trace(TRACE_SCHEDULER, "scheduler: pausing for %d seconds", - delay); - tv.tv_sec = delay; - tv.tv_usec = 0; - evtimer_add(&env->sc_ev, &tv); - return; + log_trace(TRACE_SCHEDULER, "scheduler: getting next batch"); -scheduler_sleep: - log_trace(TRACE_SCHEDULER, "scheduler: sleeping"); - return; -} - -static int -scheduler_process_envelope(u_int64_t evpid) -{ - struct envelope envelope; - size_t mta_av, mda_av, bnc_av; - struct scheduler_info si; + tv.tv_sec = 0; + tv.tv_usec = 0; - mta_av = env->sc_maxconn - stat_get(STATS_MTA_SESSION, STAT_ACTIVE); - mda_av = env->sc_maxconn - stat_get(STATS_MDA_SESSION, STAT_ACTIVE); - bnc_av = env->sc_maxconn - stat_get(STATS_SCHEDULER_BOUNCES, STAT_ACTIVE); + typemask = SCHED_REMOVE | SCHED_EXPIRE | SCHED_BOUNCE; + if (!(env->sc_flags & SMTPD_MDA_PAUSED)) + typemask |= SCHED_MDA; + if (!(env->sc_flags & SMTPD_MTA_PAUSED)) + typemask |= SCHED_MTA; - if (! queue_envelope_load(evpid, &envelope)) - return 0; + backend->batch(typemask, time(NULL), &batch); + switch (batch.type) { + case SCHED_NONE: + log_trace(TRACE_SCHEDULER, "scheduler: sleeping"); + return; - if (envelope.type == D_MDA) - if (mda_av == 0) { - env->sc_flags |= SMTPD_MDA_BUSY; - return 0; - } + case SCHED_DELAY: + tv.tv_sec = batch.delay; + log_trace(TRACE_SCHEDULER, + "scheduler: pausing for %li seconds", tv.tv_sec); + break; - if (envelope.type == D_MTA) - if (mta_av == 0) { - env->sc_flags |= SMTPD_MTA_BUSY; - return 0; - } + case SCHED_REMOVE: + scheduler_process_remove(&batch); + break; - if (envelope.type == D_BOUNCE) - if (bnc_av == 0) { - env->sc_flags |= SMTPD_BOUNCE_BUSY; - return 0; - } + case SCHED_EXPIRE: + scheduler_process_expire(&batch); + break; - if (scheduler_check_loop(&envelope)) { - struct envelope bounce; + case SCHED_BOUNCE: + scheduler_process_bounce(&batch); + break; - envelope_set_errormsg(&envelope, "loop has been detected"); - if (bounce_record_message(&envelope, &bounce)) { - scheduler_info(&si, &bounce); - backend->insert(&si); - } - backend->remove(evpid); - queue_envelope_delete(&envelope); + case SCHED_MDA: + scheduler_process_mda(&batch); + break; - scheduler_reset_events(); + case SCHED_MTA: + scheduler_process_mta(&batch); + break; - return 0; + default: + fatalx("scheduler_timeout: unknown batch type"); } + evtimer_add(&env->sc_ev, &tv); +} - return scheduler_process_batch(envelope.type, evpid); +static void +scheduler_process_remove(struct scheduler_batch *batch) +{ + struct envelope evp; + struct id_list *e; + + while ((e = batch->evpids)) { + batch->evpids = e->next; + log_debug("scheduler: deleting evp:%016" PRIx64 " (removed)", + e->id); + evp.id = e->id; + queue_envelope_delete(&evp); + free(e); + } } -static int -scheduler_process_batch(enum delivery_type type, u_int64_t evpid) +static void +scheduler_process_expire(struct scheduler_batch *batch) { struct envelope evp; - void *batch; - int fd; - - batch = backend->batch(evpid); - switch (type) { - case D_BOUNCE: - while (backend->fetch(batch, &evpid)) { - if (! queue_envelope_load(evpid, &evp)) - goto end; - - evp.lasttry = time(NULL); - imsg_compose_event(env->sc_ievs[PROC_QUEUE], - IMSG_SMTP_ENQUEUE, PROC_SMTP, 0, -1, &evp, - sizeof evp); - backend->schedule(evpid); - } + struct id_list *e; + + while ((e = batch->evpids)) { + batch->evpids = e->next; + log_debug("scheduler: deleting evp:%016" PRIx64 " (expire)", + e->id); + evp.id = e->id; + queue_envelope_delete(&evp); + free(e); + } +} + +static void +scheduler_process_bounce(struct scheduler_batch *batch) +{ + struct envelope evp; + struct id_list *e; + + while ((e = batch->evpids)) { + batch->evpids = e->next; + log_debug("scheduler: scheduling evp:%016" PRIx64 " (bounce)", + e->id); + queue_envelope_load(e->id, &evp); + evp.lasttry = time(NULL); + imsg_compose_event(env->sc_ievs[PROC_QUEUE], + IMSG_SMTP_ENQUEUE, PROC_SMTP, 0, -1, &evp, + sizeof evp); stat_increment(STATS_SCHEDULER); stat_increment(STATS_SCHEDULER_BOUNCES); - break; - - case D_MDA: - backend->fetch(batch, &evpid); - if (! queue_envelope_load(evpid, &evp)) - goto end; - + free(e); + } +} + +static void +scheduler_process_mda(struct scheduler_batch *batch) +{ + struct envelope evp; + struct id_list *e; + int fd; + + while ((e = batch->evpids)) { + batch->evpids = e->next; + log_debug("scheduler: scheduling evp:%016" PRIx64 " (mda)", + e->id); + queue_envelope_load(e->id, &evp); evp.lasttry = time(NULL); - fd = queue_message_fd_r(evpid_to_msgid(evpid)); + fd = queue_message_fd_r(evpid_to_msgid(evp.id)); imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_MDA_SESS_NEW, PROC_MDA, 0, fd, &evp, sizeof evp); - backend->schedule(evpid); - stat_increment(STATS_SCHEDULER); stat_increment(STATS_MDA_SESSION); - break; + free(e); + } +} - case D_MTA: { - struct mta_batch mta_batch; +static void +scheduler_process_mta(struct scheduler_batch *batch) +{ + struct envelope evp; + struct mta_batch mta_batch; + struct id_list *e; - /* FIXME */ - if (! backend->fetch(batch, &evpid)) - goto end; - if (! queue_envelope_load(evpid, &evp)) - goto end; + queue_envelope_load(batch->evpids->id, &evp); - bzero(&mta_batch, sizeof mta_batch); - mta_batch.id = arc4random(); - mta_batch.relay = evp.agent.mta.relay; + bzero(&mta_batch, sizeof mta_batch); + mta_batch.id = arc4random(); + mta_batch.relay = evp.agent.mta.relay; - imsg_compose_event(env->sc_ievs[PROC_QUEUE], - IMSG_BATCH_CREATE, PROC_MTA, 0, -1, &mta_batch, - sizeof mta_batch); + imsg_compose_event(env->sc_ievs[PROC_QUEUE], + IMSG_BATCH_CREATE, PROC_MTA, 0, -1, &mta_batch, + sizeof mta_batch); - while (backend->fetch(batch, &evpid)) { - if (! queue_envelope_load(evpid, &evp)) - goto end; - evp.lasttry = time(NULL); /* FIXME */ - evp.batch_id = mta_batch.id; + while ((e = batch->evpids)) { + batch->evpids = e->next; + log_debug("scheduler: scheduling evp:%016" PRIx64 " (mta)", + e->id); + queue_envelope_load(e->id, &evp); + evp.lasttry = time(NULL); + evp.batch_id = mta_batch.id; + imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_BATCH_APPEND, + PROC_MTA, 0, -1, &evp, sizeof evp); + free(e); + stat_increment(STATS_SCHEDULER); + } - imsg_compose_event(env->sc_ievs[PROC_QUEUE], - IMSG_BATCH_APPEND, PROC_MTA, 0, -1, &evp, - sizeof evp); + imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_BATCH_CLOSE, + PROC_MTA, 0, -1, &mta_batch, sizeof mta_batch); - backend->schedule(evpid); - stat_increment(STATS_SCHEDULER); - } + stat_increment(STATS_MTA_SESSION); +} - imsg_compose_event(env->sc_ievs[PROC_QUEUE], - IMSG_BATCH_CLOSE, PROC_MTA, 0, -1, &mta_batch, - sizeof mta_batch); +void +scheduler_envelope_update(struct envelope *e) +{ + struct scheduler_info si; - stat_increment(STATS_MTA_SESSION); - break; - } - - default: - fatalx("scheduler_process_batchqueue: unknown type"); - } + scheduler_info(&si, e); + backend->update(&si); + scheduler_reset_events(); +} -end: - backend->close(batch); - return 1; +void +scheduler_envelope_delete(struct envelope *e) +{ + backend->delete(e->id); + scheduler_reset_events(); } static int @@ -506,7 +490,7 @@ scheduler_load_message(u_int32_t msgid) struct qwalk *q; u_int64_t evpid; struct envelope envelope; - struct scheduler_info si; + struct scheduler_info si; q = qwalk_new(msgid); while (qwalk(q, &evpid)) { @@ -515,98 +499,8 @@ scheduler_load_message(u_int32_t msgid) scheduler_info(&si, &envelope); backend->insert(&si); } - qwalk_close(q); + qwalk_close(q); + backend->commit(msgid); return 1; } - -static int -scheduler_check_loop(struct envelope *ep) -{ - int fd; - FILE *fp; - char *buf, *lbuf; - size_t len; - struct mailaddr maddr; - int ret = 0; - int rcvcount = 0; - - fd = queue_message_fd_r(evpid_to_msgid(ep->id)); - if ((fp = fdopen(fd, "r")) == NULL) - fatal("fdopen"); - - lbuf = NULL; - while ((buf = fgetln(fp, &len))) { - if (buf[len - 1] == '\n') - buf[len - 1] = '\0'; - else { - /* EOF without EOL, copy and add the NUL */ - if ((lbuf = malloc(len + 1)) == NULL) - err(1, NULL); - memcpy(lbuf, buf, len); - lbuf[len] = '\0'; - buf = lbuf; - } - - if (strchr(buf, ':') == NULL && !isspace((int)*buf)) - break; - - if (strncasecmp("Received: ", buf, 10) == 0) { - rcvcount++; - if (rcvcount == MAX_HOPS_COUNT) { - ret = 1; - break; - } - } - - else if (strncasecmp("Delivered-To: ", buf, 14) == 0) { - struct mailaddr dest; - - bzero(&maddr, sizeof (struct mailaddr)); - if (! email_to_mailaddr(&maddr, buf + 14)) - continue; - - dest = ep->dest; - if (ep->type == D_BOUNCE) - dest = ep->sender; - - if (strcasecmp(maddr.user, dest.user) == 0 && - strcasecmp(maddr.domain, dest.domain) == 0) { - ret = 1; - break; - } - } - } - free(lbuf); - - fclose(fp); - return ret; -} - -static void -scheduler_remove(u_int64_t id) -{ - void *msg; - - /* removing by evpid */ - if (id > 0xffffffffL) { - scheduler_remove_envelope(id); - return; - } - - /* removing by msgid */ - msg = backend->message(id); - while (backend->fetch(msg, &id)) - scheduler_remove_envelope(id); - backend->close(msg); -} - -static void -scheduler_remove_envelope(u_int64_t evpid) -{ - struct envelope evp; - - evp.id = evpid; - queue_envelope_delete(&evp); - backend->remove(evpid); -} diff --git a/usr.sbin/smtpd/scheduler_backend.c b/usr.sbin/smtpd/scheduler_backend.c index e22411fb7d9..7dafbf488b4 100644 --- a/usr.sbin/smtpd/scheduler_backend.c +++ b/usr.sbin/smtpd/scheduler_backend.c @@ -1,4 +1,4 @@ -/* $OpenBSD: scheduler_backend.c,v 1.2 2012/07/28 12:06:41 chl Exp $ */ +/* $OpenBSD: scheduler_backend.c,v 1.3 2012/08/08 08:50:42 eric Exp $ */ /* * Copyright (c) 2012 Gilles Chehade @@ -57,3 +57,24 @@ scheduler_info(struct scheduler_info *sched, struct envelope *evp) sched->expire = evp->expire; sched->retry = evp->retry; } + +time_t +scheduler_compute_schedule(struct scheduler_info *sched) +{ + time_t delay; + + if (sched->retry == 0) + delay = 0; +#if 0 + /* for testing scheduler sleep */ + delay == arc4random() % 30; +#endif + else if (sched->retry < 4) + delay = (sched->retry * 15 * 60); + else if (sched->retry < 8) + delay = ((sched->retry - 3) * 60 * 60); + else + delay = ((sched->retry - 7) * 24 * 60 * 60); + + return (sched->creation + delay); +} diff --git a/usr.sbin/smtpd/scheduler_ramqueue.c b/usr.sbin/smtpd/scheduler_ramqueue.c index a2ab2f1254d..07717fcd616 100644 --- a/usr.sbin/smtpd/scheduler_ramqueue.c +++ b/usr.sbin/smtpd/scheduler_ramqueue.c @@ -1,7 +1,8 @@ -/* $OpenBSD: scheduler_ramqueue.c,v 1.11 2012/07/10 11:13:40 gilles Exp $ */ +/* $OpenBSD: scheduler_ramqueue.c,v 1.12 2012/08/08 08:50:42 eric Exp $ */ /* * Copyright (c) 2012 Gilles Chehade + * Copyright (c) 2012 Eric Faurot * * Permission to use, copy, modify, and distribute this software for any * purpose with or without fee is hereby granted, provided that the above @@ -36,787 +37,703 @@ #include "smtpd.h" #include "log.h" +SPLAY_HEAD(hosttree, rq_host); +TAILQ_HEAD(evplist, rq_envelope); -struct ramqueue_host { - RB_ENTRY(ramqueue_host) hosttree_entry; - TAILQ_HEAD(,ramqueue_batch) batch_queue; - char hostname[MAXHOSTNAMELEN]; +struct rq_host { + SPLAY_ENTRY(rq_host) hosttree_entry; + char host[MAXHOSTNAMELEN]; + struct tree batches; }; -struct ramqueue_batch { - enum delivery_type type; - TAILQ_ENTRY(ramqueue_batch) batch_entry; - TAILQ_HEAD(,ramqueue_envelope) envelope_queue; - struct ramqueue_host *rq_host; - u_int64_t b_id; - u_int32_t msgid; - u_int32_t evpcnt; -}; -struct ramqueue_envelope { - TAILQ_ENTRY(ramqueue_envelope) queue_entry; - TAILQ_ENTRY(ramqueue_envelope) batchqueue_entry; - RB_ENTRY(ramqueue_envelope) evptree_entry; - struct ramqueue_batch *rq_batch; - struct ramqueue_message *rq_msg; - struct ramqueue_host *rq_host; - u_int64_t evpid; - time_t sched; -}; -struct ramqueue_message { - RB_ENTRY(ramqueue_message) msgtree_entry; - RB_HEAD(evptree, ramqueue_envelope) evptree; - u_int32_t msgid; - u_int32_t evpcnt; + +struct rq_message { + uint32_t msgid; + struct tree envelopes; }; -struct ramqueue { - RB_HEAD(hosttree, ramqueue_host) hosttree; - RB_HEAD(msgtree, ramqueue_message) msgtree; - RB_HEAD(offloadtree, ramqueue_envelope) offloadtree; - TAILQ_HEAD(,ramqueue_envelope) queue; + +struct rq_batch { + uint32_t msgid; + struct tree envelopes; + struct rq_host *host; }; -RB_PROTOTYPE(hosttree, ramqueue_host, hosttree_entry, ramqueue_host_cmp); -RB_PROTOTYPE(msgtree, ramqueue_message, msg_entry, ramqueue_msg_cmp); -RB_PROTOTYPE(evptree, ramqueue_envelope, evp_entry, ramqueue_evp_cmp); -RB_PROTOTYPE(offloadtree, ramqueue_envelope, evp_entry, ramqueue_evp_cmp); +struct rq_envelope { + TAILQ_ENTRY(rq_envelope) entry; + + uint64_t evpid; + int type; + +#define RQ_ENVELOPE_INFLIGHT 0x01 +#define RQ_ENVELOPE_EXPIRED 0x02 + uint8_t flags; -enum ramqueue_iter_type { - RAMQUEUE_ITER_HOST, - RAMQUEUE_ITER_BATCH, - RAMQUEUE_ITER_MESSAGE, - RAMQUEUE_ITER_QUEUE + time_t sched; + time_t expire; + + struct rq_message *message; + struct rq_batch *batch; + struct evplist *queue; }; -struct ramqueue_iter { - enum ramqueue_iter_type type; - union { - struct ramqueue_host *host; - struct ramqueue_batch *batch; - struct ramqueue_message *message; - } u; +struct rq_queue { + struct hosttree hosts; + struct tree messages; + struct evplist mda; + struct evplist mta; + struct evplist bounce; + struct evplist inflight; + struct tree expired; + struct tree removed; }; +static void scheduler_ramqueue_init(void); +static void scheduler_ramqueue_insert(struct scheduler_info *); +static void scheduler_ramqueue_commit(uint32_t); +static void scheduler_ramqueue_rollback(uint32_t); +static void scheduler_ramqueue_update(struct scheduler_info *); +static void scheduler_ramqueue_delete(uint64_t); +static void scheduler_ramqueue_batch(int, time_t, struct scheduler_batch *); +static void scheduler_ramqueue_schedule(uint64_t); +static void scheduler_ramqueue_remove(uint64_t); + +static int scheduler_ramqueue_next(int, uint64_t *, time_t *); +static void sorted_insert(struct evplist *, struct rq_envelope *); +static void sorted_merge(struct evplist *, struct evplist *); + -static int ramqueue_host_cmp(struct ramqueue_host *, struct ramqueue_host *); -static int ramqueue_msg_cmp(struct ramqueue_message *, struct ramqueue_message *); -static int ramqueue_evp_cmp(struct ramqueue_envelope *, struct ramqueue_envelope *); -static struct ramqueue_host *ramqueue_lookup_host(char *); -static struct ramqueue_host *ramqueue_insert_host(char *); -static void ramqueue_remove_host(struct ramqueue_host *); -static struct ramqueue_batch *ramqueue_lookup_batch(struct ramqueue_host *, - u_int32_t); -static struct ramqueue_batch *ramqueue_insert_batch(struct ramqueue_host *, - u_int32_t); -static void ramqueue_remove_batch(struct ramqueue_host *, struct ramqueue_batch *); -static struct ramqueue_message *ramqueue_lookup_message(u_int32_t); -static struct ramqueue_message *ramqueue_insert_message(u_int32_t); -static void ramqueue_remove_message(struct ramqueue_message *); - -static struct ramqueue_envelope *ramqueue_lookup_envelope(u_int64_t); -static struct ramqueue_envelope *ramqueue_lookup_offload(u_int64_t); - - -/*NEEDSFIX*/ -static int ramqueue_expire(struct envelope *); -static time_t ramqueue_next_schedule(struct scheduler_info *, time_t); - -static void scheduler_ramqueue_init(void); -static int scheduler_ramqueue_setup(void); -static int scheduler_ramqueue_next(u_int64_t *, time_t *); -static void scheduler_ramqueue_insert(struct scheduler_info *); -static void scheduler_ramqueue_schedule(u_int64_t); -static void scheduler_ramqueue_remove(u_int64_t); -static void *scheduler_ramqueue_host(char *); -static void *scheduler_ramqueue_message(u_int32_t); -static void *scheduler_ramqueue_batch(u_int64_t); -static void *scheduler_ramqueue_queue(void); -static void scheduler_ramqueue_close(void *); -static int scheduler_ramqueue_fetch(void *, u_int64_t *); -static int scheduler_ramqueue_force(u_int64_t); -static void scheduler_ramqueue_display(void); +static void rq_queue_init(struct rq_queue *); +static void rq_queue_merge(struct rq_queue *, struct rq_queue *); +static void rq_queue_dump(struct rq_queue *, const char *, time_t); +static void rq_envelope_delete(struct rq_queue *, struct rq_envelope *); +static const char *rq_envelope_to_text(struct rq_envelope *, time_t); +static struct rq_host *rq_host_lookup(struct hosttree *, char *); +static struct rq_host *rq_host_create(struct hosttree *, char *); +static int rq_host_cmp(struct rq_host *, struct rq_host *); + +SPLAY_PROTOTYPE(hosttree, rq_host, hosttree_entry, rq_host_cmp); struct scheduler_backend scheduler_backend_ramqueue = { scheduler_ramqueue_init, - scheduler_ramqueue_setup, - scheduler_ramqueue_next, - scheduler_ramqueue_insert, - scheduler_ramqueue_schedule, - scheduler_ramqueue_remove, - scheduler_ramqueue_host, - scheduler_ramqueue_message, - scheduler_ramqueue_batch, - scheduler_ramqueue_queue, - scheduler_ramqueue_close, - scheduler_ramqueue_fetch, - scheduler_ramqueue_force, - scheduler_ramqueue_display -}; -static struct ramqueue ramqueue; -static void -scheduler_ramqueue_display_hosttree(void) -{ - struct ramqueue_host *rq_host; - struct ramqueue_batch *rq_batch; - struct ramqueue_envelope *rq_evp; - - log_debug("\tscheduler_ramqueue: hosttree display"); - RB_FOREACH(rq_host, hosttree, &ramqueue.hosttree) { - log_debug("\t\thost: [%p] %s", rq_host, rq_host->hostname); - TAILQ_FOREACH(rq_batch, &rq_host->batch_queue, batch_entry) { - log_debug("\t\t\tbatch: [%p] %016x", - rq_batch, rq_batch->msgid); - TAILQ_FOREACH(rq_evp, &rq_batch->envelope_queue, - batchqueue_entry) { - log_debug("\t\t\t\tevpid: [%p] %016"PRIx64, - rq_evp, rq_evp->evpid); - } - } - } -} + scheduler_ramqueue_insert, + scheduler_ramqueue_commit, + scheduler_ramqueue_rollback, -static void -scheduler_ramqueue_display_msgtree(void) -{ - struct ramqueue_message *rq_msg; - struct ramqueue_envelope *rq_evp; - - log_debug("\tscheduler_ramqueue: msgtree display"); - RB_FOREACH(rq_msg, msgtree, &ramqueue.msgtree) { - log_debug("\t\tmsg: [%p] %016x", rq_msg, rq_msg->msgid); - RB_FOREACH(rq_evp, evptree, &rq_msg->evptree) { - log_debug("\t\t\tevp: [%p] %016"PRIx64, - rq_evp, rq_evp->evpid); - } - } -} + scheduler_ramqueue_update, + scheduler_ramqueue_delete, -static void -scheduler_ramqueue_display_offloadtree(void) -{ - struct ramqueue_envelope *rq_evp; + scheduler_ramqueue_batch, - log_debug("\tscheduler_ramqueue: offloadtree display"); - RB_FOREACH(rq_evp, offloadtree, &ramqueue.offloadtree) { - log_debug("\t\t\tevp: [%p] %016"PRIx64, - rq_evp, rq_evp->evpid); - } -} + scheduler_ramqueue_schedule, + scheduler_ramqueue_remove, +}; -static void -scheduler_ramqueue_display_queue(void) -{ - struct ramqueue_envelope *rq_evp; +static struct rq_queue ramqueue; +static struct tree updates; - log_debug("\tscheduler_ramqueue: queue display"); - TAILQ_FOREACH(rq_evp, &ramqueue.queue, queue_entry) { - log_debug("\t\tevpid: [%p] [batch: %p], %016"PRIx64, - rq_evp, rq_evp->rq_batch, rq_evp->evpid); - } -} - -static void -scheduler_ramqueue_display(void) -{ - log_debug("scheduler_ramqueue: display"); - scheduler_ramqueue_display_hosttree(); - scheduler_ramqueue_display_msgtree(); - scheduler_ramqueue_display_offloadtree(); - scheduler_ramqueue_display_queue(); -} +extern int verbose; static void scheduler_ramqueue_init(void) { - log_debug("scheduler_ramqueue: init"); - bzero(&ramqueue, sizeof (ramqueue)); - TAILQ_INIT(&ramqueue.queue); - RB_INIT(&ramqueue.hosttree); - RB_INIT(&ramqueue.msgtree); - RB_INIT(&ramqueue.offloadtree); + rq_queue_init(&ramqueue); + tree_init(&updates); } -static int -scheduler_ramqueue_setup(void) +static void +scheduler_ramqueue_insert(struct scheduler_info *si) { - struct envelope envelope; - static struct qwalk *q = NULL; - u_int64_t evpid; - struct scheduler_info si; - - log_debug("scheduler_ramqueue: load"); - - log_info("scheduler_ramqueue: queue loading in progress"); - if (q == NULL) - q = qwalk_new(0); - - while (qwalk(q, &evpid)) { - /* the envelope is already in ramqueue, skip */ - if (ramqueue_lookup_envelope(evpid) || - ramqueue_lookup_offload(evpid)) - continue; - - if (! queue_envelope_load(evpid, &envelope)) { - log_debug("scheduler_ramqueue: evp -> /corrupt"); - queue_message_corrupt(evpid_to_msgid(evpid)); - continue; - } - if (ramqueue_expire(&envelope)) - continue; + uint32_t msgid; + struct rq_queue *update; + struct rq_host *host; + struct rq_message *message; + struct rq_batch *batch; + struct rq_envelope *envelope; - scheduler_info(&si, &envelope); - scheduler_ramqueue_insert(&si); + msgid = evpid_to_msgid(si->evpid); - log_debug("ramqueue: loading interrupted"); - return (0); + /* find/prepare a ramqueue update */ + if ((update = tree_get(&updates, msgid)) == NULL) { + update = xcalloc(1, sizeof *update, "scheduler_insert"); + rq_queue_init(update); + tree_xset(&updates, msgid, update); } - qwalk_close(q); - q = NULL; - log_debug("ramqueue: loading over"); - return (1); -} -static int -scheduler_ramqueue_next(u_int64_t *evpid, time_t *sched) -{ - struct ramqueue_envelope *rq_evp = NULL; + /* find/prepare the host in ramqueue update */ + if ((host = rq_host_lookup(&update->hosts, si->destination)) == NULL) + host = rq_host_create(&update->hosts, si->destination); - log_debug("scheduler_ramqueue: next"); - TAILQ_FOREACH(rq_evp, &ramqueue.queue, queue_entry) { - if (rq_evp->rq_batch->type == D_MDA) - if (env->sc_flags & (SMTPD_MDA_PAUSED|SMTPD_MDA_BUSY)) - continue; - if (rq_evp->rq_batch->type == D_MTA) - if (env->sc_flags & (SMTPD_MTA_PAUSED|SMTPD_MTA_BUSY)) - continue; - if (evpid) - *evpid = rq_evp->evpid; - if (sched) - *sched = rq_evp->sched; - log_debug("scheduler_ramqueue: next: found"); - return 1; + /* find/prepare the hosttree message in ramqueue update */ + if ((batch = tree_get(&host->batches, msgid)) == NULL) { + batch = xcalloc(1, sizeof *batch, "scheduler_insert"); + batch->msgid = msgid; + tree_init(&batch->envelopes); + tree_xset(&host->batches, msgid, batch); } - log_debug("scheduler_ramqueue: next: nothing schedulable"); - return 0; -} - -static void -scheduler_ramqueue_insert(struct scheduler_info *si) -{ - struct ramqueue_host *rq_host; - struct ramqueue_message *rq_msg; - struct ramqueue_batch *rq_batch; - struct ramqueue_envelope *rq_evp, *evp; - u_int32_t msgid; - time_t curtm = time(NULL); - - log_debug("scheduler_ramqueue: insert"); - - rq_evp = ramqueue_lookup_offload(si->evpid); - if (rq_evp) { - rq_msg = rq_evp->rq_msg; - rq_batch = rq_evp->rq_batch; - rq_host = rq_evp->rq_host; - RB_REMOVE(offloadtree, &ramqueue.offloadtree, rq_evp); - } - else { - msgid = evpid_to_msgid(si->evpid); - rq_msg = ramqueue_lookup_message(msgid); - if (rq_msg == NULL) - rq_msg = ramqueue_insert_message(msgid); - - rq_host = ramqueue_lookup_host(si->destination); - if (rq_host == NULL) - rq_host = ramqueue_insert_host(si->destination); - - rq_batch = ramqueue_lookup_batch(rq_host, msgid); - if (rq_batch == NULL) - rq_batch = ramqueue_insert_batch(rq_host, msgid); - - rq_evp = calloc(1, sizeof (*rq_evp)); - if (rq_evp == NULL) - fatal("calloc"); - rq_evp->evpid = si->evpid; - rq_batch->evpcnt++; - rq_msg->evpcnt++; + /* find/prepare the msgtree message in ramqueue update */ + if ((message = tree_get(&update->messages, msgid)) == NULL) { + message = xcalloc(1, sizeof *message, "scheduler_insert"); + message->msgid = msgid; + tree_init(&message->envelopes); + tree_xset(&update->messages, msgid, message); } - rq_evp->sched = ramqueue_next_schedule(si, curtm); - rq_evp->rq_host = rq_host; - rq_evp->rq_batch = rq_batch; - rq_evp->rq_msg = rq_msg; - RB_INSERT(evptree, &rq_msg->evptree, rq_evp); - TAILQ_INSERT_TAIL(&rq_batch->envelope_queue, rq_evp, - batchqueue_entry); - - /* sorted insert */ - TAILQ_FOREACH(evp, &ramqueue.queue, queue_entry) { - if (evp->sched >= rq_evp->sched) { - TAILQ_INSERT_BEFORE(evp, rq_evp, queue_entry); - break; - } + /* create envelope in ramqueue message */ + envelope = xcalloc(1, sizeof *envelope, "scheduler_insert"); + envelope->evpid = si->evpid; + envelope->type = si->type; + envelope->message = message; + envelope->batch = batch; + envelope->sched = scheduler_compute_schedule(si); + envelope->expire = si->creation + si->expire; + + if (envelope->expire < envelope->sched) { + envelope->flags |= RQ_ENVELOPE_EXPIRED; + tree_xset(&update->expired, envelope->evpid, envelope); } - if (evp == NULL) - TAILQ_INSERT_TAIL(&ramqueue.queue, rq_evp, queue_entry); - - stat_increment(STATS_RAMQUEUE_ENVELOPE); -} -static void -scheduler_ramqueue_schedule(u_int64_t evpid) -{ - struct ramqueue_envelope *rq_evp; - struct ramqueue_message *rq_msg; - struct ramqueue_batch *rq_batch; - - log_debug("scheduler_ramqueue: schedule"); + tree_xset(&batch->envelopes, envelope->evpid, envelope); + tree_xset(&message->envelopes, envelope->evpid, envelope); - rq_evp = ramqueue_lookup_envelope(evpid); - rq_msg = rq_evp->rq_msg; - rq_batch = rq_evp->rq_batch; + if (si->type == D_BOUNCE) + envelope->queue = &update->bounce; + else if (si->type == D_MDA) + envelope->queue = &update->mda; + else if (si->type == D_MTA) + envelope->queue = &update->mta; + else + errx(1, "bad type"); - /* remove from msg tree, batch queue and linear queue */ - RB_REMOVE(evptree, &rq_msg->evptree, rq_evp); - TAILQ_REMOVE(&rq_batch->envelope_queue, rq_evp, batchqueue_entry); - TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry); + sorted_insert(envelope->queue, envelope); - /* insert into offload tree*/ - RB_INSERT(offloadtree, &ramqueue.offloadtree, rq_evp); - - /* that's one less envelope to process in the ramqueue */ - stat_decrement(STATS_RAMQUEUE_ENVELOPE); + if (verbose & TRACE_SCHEDULER) + rq_queue_dump(update, "inserted", time(NULL)); } static void -scheduler_ramqueue_remove(u_int64_t evpid) +scheduler_ramqueue_commit(uint32_t msgid) { - struct ramqueue_batch *rq_batch; - struct ramqueue_message *rq_msg; - struct ramqueue_envelope *rq_evp; - struct ramqueue_host *rq_host; - - log_debug("scheduler_ramqueue: remove"); - - rq_evp = ramqueue_lookup_offload(evpid); - if (rq_evp) { - RB_REMOVE(offloadtree, &ramqueue.offloadtree, rq_evp); - rq_msg = rq_evp->rq_msg; - rq_batch = rq_evp->rq_batch; - rq_host = rq_evp->rq_host; - } - else { - rq_evp = ramqueue_lookup_envelope(evpid); - rq_msg = rq_evp->rq_msg; - rq_batch = rq_evp->rq_batch; - rq_host = rq_evp->rq_host; - - RB_REMOVE(evptree, &rq_msg->evptree, rq_evp); - TAILQ_REMOVE(&rq_batch->envelope_queue, rq_evp, batchqueue_entry); - TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry); - stat_decrement(STATS_RAMQUEUE_ENVELOPE); - } + struct rq_queue *update; + time_t now; - rq_batch->evpcnt--; - rq_msg->evpcnt--; + update = tree_xpop(&updates, msgid); - /* check if we are the last of a message */ - if (rq_msg->evpcnt == 0) { - ramqueue_remove_message(rq_msg); + if (verbose & TRACE_SCHEDULER) { + now = time(NULL); + rq_queue_dump(update, "commit update", now); + rq_queue_dump(&ramqueue, "before commit", now); } + rq_queue_merge(&ramqueue, update); - /* check if we are the last of a batch */ - if (rq_batch->evpcnt == 0) { - ramqueue_remove_batch(rq_host, rq_batch); - } - - /* check if we are the last of a host */ - if (TAILQ_FIRST(&rq_host->batch_queue) == NULL) { - ramqueue_remove_host(rq_host); - } + if (verbose & TRACE_SCHEDULER) + rq_queue_dump(&ramqueue, "after commit", now); - free(rq_evp); + free(update); } -static void * -scheduler_ramqueue_host(char *host) +static void +scheduler_ramqueue_rollback(uint32_t msgid) { - struct ramqueue_iter *iter; - struct ramqueue_host *rq_host; - - rq_host = ramqueue_lookup_host(host); - if (rq_host == NULL) - return NULL; + struct rq_queue *update; + struct rq_envelope *envelope; - iter = calloc(1, sizeof *iter); - if (iter == NULL) - err(1, "calloc"); + update = tree_xpop(&updates, msgid); - iter->type = RAMQUEUE_ITER_HOST; - iter->u.host = rq_host; + while ((envelope = TAILQ_FIRST(&update->bounce))) + rq_envelope_delete(update, envelope); + while ((envelope = TAILQ_FIRST(&update->mda))) + rq_envelope_delete(update, envelope); + while ((envelope = TAILQ_FIRST(&update->mta))) + rq_envelope_delete(update, envelope); - return iter; + free(update); } -static void * -scheduler_ramqueue_batch(u_int64_t evpid) +static void +scheduler_ramqueue_update(struct scheduler_info *si) { - struct ramqueue_iter *iter; - struct ramqueue_envelope *rq_evp; + struct rq_message *message; + struct rq_envelope *envelope; + uint32_t msgid; - rq_evp = ramqueue_lookup_envelope(evpid); - if (rq_evp == NULL) - return NULL; + msgid = evpid_to_msgid(si->evpid); + message = tree_xget(&ramqueue.messages, msgid); + envelope = tree_xget(&message->envelopes, si->evpid); - iter = calloc(1, sizeof *iter); - if (iter == NULL) - err(1, "calloc"); + /* it *should* be in-flight */ + if (!(envelope->flags & RQ_ENVELOPE_INFLIGHT)) + log_warnx("evp:%016" PRIx64 " not in-flight", si->evpid); + + envelope->flags &= ~RQ_ENVELOPE_INFLIGHT; + envelope->sched = scheduler_compute_schedule(si); + if (envelope->expire < envelope->sched) { + envelope->flags |= RQ_ENVELOPE_EXPIRED; + tree_xset(&ramqueue.expired, envelope->evpid, envelope); + } - iter->type = RAMQUEUE_ITER_BATCH; - iter->u.batch = rq_evp->rq_batch; + TAILQ_REMOVE(envelope->queue, envelope, entry); + if (si->type == D_BOUNCE) + envelope->queue = &ramqueue.bounce; + else if (si->type == D_MDA) + envelope->queue = &ramqueue.mda; + else if (si->type == D_MTA) + envelope->queue = &ramqueue.mta; - return iter; + sorted_insert(envelope->queue, envelope); } -static void * -scheduler_ramqueue_message(u_int32_t msgid) +static void +scheduler_ramqueue_delete(uint64_t evpid) { - struct ramqueue_iter *iter; - struct ramqueue_message *rq_msg; - - rq_msg = ramqueue_lookup_message(msgid); - if (rq_msg == NULL) - return NULL; - - iter = calloc(1, sizeof *iter); - if (iter == NULL) - err(1, "calloc"); + struct rq_message *message; + struct rq_envelope *envelope; + uint32_t msgid; - iter->type = RAMQUEUE_ITER_MESSAGE; - iter->u.message = rq_msg; + msgid = evpid_to_msgid(evpid); + message = tree_xget(&ramqueue.messages, msgid); + envelope = tree_xget(&message->envelopes, evpid); - return iter; + /* it *must* be in-flight */ + if (!(envelope->flags & RQ_ENVELOPE_INFLIGHT)) + errx(1, "evp:%016" PRIx64 " not in-flight", evpid); + rq_envelope_delete(&ramqueue, envelope); } -static void * -scheduler_ramqueue_queue(void) -{ - struct ramqueue_iter *iter; +static int +scheduler_ramqueue_next(int typemask, uint64_t *evpid, time_t *sched) +{ + struct rq_envelope *evp_mda = NULL; + struct rq_envelope *evp_mta = NULL; + struct rq_envelope *evp_bounce = NULL; + struct rq_envelope *envelope = NULL; + + if (verbose & TRACE_SCHEDULER) + rq_queue_dump(&ramqueue, "next", time(NULL)); + + *sched = 0; + + if (typemask & SCHED_REMOVE && tree_root(&ramqueue.removed, evpid, NULL)) + return (1); + if (typemask & SCHED_EXPIRE && tree_root(&ramqueue.expired, evpid, NULL)) + return (1); + + /* fetch first envelope from each queue */ + if (typemask & SCHED_BOUNCE) + evp_bounce = TAILQ_FIRST(&ramqueue.bounce); + if (typemask & SCHED_MDA) + evp_mda = TAILQ_FIRST(&ramqueue.mda); + if (typemask & SCHED_MTA) + evp_mta = TAILQ_FIRST(&ramqueue.mta); + + /* set current envelope to either one */ + if (evp_bounce) + envelope = evp_bounce; + else if (evp_mda) + envelope = evp_mda; + else if (evp_mta) + envelope = evp_mta; + else + return (0); - iter = calloc(1, sizeof *iter); - if (iter == NULL) - err(1, "calloc"); + /* figure out which one should be scheduled first */ + if (evp_bounce && evp_bounce->sched < envelope->sched) + envelope = evp_bounce; + if (evp_mda && evp_mda->sched < envelope->sched) + envelope = evp_mda; + if (evp_mta && evp_mta->sched < envelope->sched) + envelope = evp_mta; - iter->type = RAMQUEUE_ITER_QUEUE; + *evpid = envelope->evpid; + *sched = envelope->sched; - return iter; + return (1); } static void -scheduler_ramqueue_close(void *hdl) -{ - free(hdl); -} - -int -scheduler_ramqueue_fetch(void *hdl, u_int64_t *evpid) -{ - struct ramqueue_iter *iter = hdl; - struct ramqueue_envelope *rq_evp; - struct ramqueue_batch *rq_batch; - - switch (iter->type) { - case RAMQUEUE_ITER_HOST: - rq_batch = TAILQ_FIRST(&iter->u.host->batch_queue); - if (rq_batch == NULL) - break; - rq_evp = TAILQ_FIRST(&rq_batch->envelope_queue); - if (rq_evp == NULL) - break; - *evpid = rq_evp->evpid; - return 1; - - case RAMQUEUE_ITER_BATCH: - rq_evp = TAILQ_FIRST(&iter->u.batch->envelope_queue); - if (rq_evp == NULL) - break; - *evpid = rq_evp->evpid; - return 1; - - case RAMQUEUE_ITER_MESSAGE: - rq_evp = RB_ROOT(&iter->u.message->evptree); - if (rq_evp == NULL) - break; - *evpid = rq_evp->evpid; - return 1; - - case RAMQUEUE_ITER_QUEUE: - rq_evp = TAILQ_FIRST(&ramqueue.queue); - if (rq_evp == NULL) - break; - *evpid = rq_evp->evpid; - return 1; +scheduler_ramqueue_batch(int typemask, time_t curr, struct scheduler_batch *ret) +{ + struct rq_message *message; + struct rq_batch *batch; + struct rq_envelope *envelope; + struct id_list *item; + uint64_t evpid; + void *i; + int type; + time_t sched; + + ret->evpids = NULL; + + if (!scheduler_ramqueue_next(typemask, &evpid, &sched)) { + ret->type = SCHED_NONE; + return; } - return 0; -} - -static int -scheduler_ramqueue_force(u_int64_t id) -{ - struct ramqueue_envelope *rq_evp; - struct ramqueue_message *rq_msg; - int ret; - - /* schedule *all* */ - if (id == 0) { - ret = 0; - TAILQ_FOREACH(rq_evp, &ramqueue.queue, queue_entry) { - rq_evp->sched = 0; - ret++; + if (tree_get(&ramqueue.removed, evpid)) { + ret->type = SCHED_REMOVE; + while (tree_poproot(&ramqueue.removed, &evpid, NULL)) { + item = xmalloc(sizeof *item, "schedule_batch"); + item->id = evpid; + item->next = ret->evpids; + ret->evpids = item; } - return ret; + return; } - /* scheduling by evpid */ - if (id > 0xffffffffL) { - rq_evp = ramqueue_lookup_envelope(id); - if (rq_evp == NULL) - return 0; + message = tree_xget(&ramqueue.messages, evpid_to_msgid(evpid)); + envelope = tree_xget(&message->envelopes, evpid); + + /* if the envelope has expired, return the expired list */ + if (envelope->flags & RQ_ENVELOPE_EXPIRED) { + ret->type = SCHED_EXPIRE; + while (tree_poproot(&ramqueue.expired, &evpid, (void**)&envelope)) { + TAILQ_REMOVE(envelope->queue, envelope, entry); + TAILQ_INSERT_TAIL(&ramqueue.inflight, envelope, entry); + envelope->flags |= RQ_ENVELOPE_INFLIGHT; + item = xmalloc(sizeof *item, "schedule_batch"); + item->id = evpid; + item->next = ret->evpids; + ret->evpids = item; + } + return; + } - rq_evp->sched = 0; - TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry); - TAILQ_INSERT_HEAD(&ramqueue.queue, rq_evp, queue_entry); - return 1; + if (sched > curr) { + ret->type = SCHED_DELAY; + ret->delay = sched - curr; + return; } - rq_msg = ramqueue_lookup_message(id); - if (rq_msg == NULL) - return 0; - - /* scheduling by msgid */ - ret = 0; - RB_FOREACH(rq_evp, evptree, &rq_msg->evptree) { - rq_evp->sched = 0; - TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry); - TAILQ_INSERT_HEAD(&ramqueue.queue, rq_evp, queue_entry); - ret++; + batch = envelope->batch; + type = envelope->type; + if (type == D_BOUNCE) + ret->type = SCHED_BOUNCE; + else if (type == D_MDA) + ret->type = SCHED_MDA; + else if (type == D_MTA) + ret->type = SCHED_MTA; + + i = NULL; + while((tree_iter(&batch->envelopes, &i, &evpid, (void*)&envelope))) { + if (envelope->type != type) + continue; + if (envelope->sched > curr) + continue; + if (envelope->flags & RQ_ENVELOPE_INFLIGHT) + continue; + if (envelope->flags & RQ_ENVELOPE_EXPIRED) + continue; + TAILQ_REMOVE(envelope->queue, envelope, entry); + TAILQ_INSERT_TAIL(&ramqueue.inflight, envelope, entry); + envelope->queue = &ramqueue.inflight; + envelope->flags |= RQ_ENVELOPE_INFLIGHT; + item = xmalloc(sizeof *item, "schedule_batch"); + item->id = evpid; + item->next = ret->evpids; + ret->evpids = item; } - return ret; } -static struct ramqueue_host * -ramqueue_lookup_host(char *host) -{ - struct ramqueue_host hostkey; - strlcpy(hostkey.hostname, host, sizeof(hostkey.hostname)); - return RB_FIND(hosttree, &ramqueue.hosttree, &hostkey); -} - -static struct ramqueue_message * -ramqueue_lookup_message(u_int32_t msgid) +static void +scheduler_ramqueue_schedule(uint64_t evpid) { - struct ramqueue_message msgkey; + struct rq_message *message; + struct rq_envelope *envelope; + uint32_t msgid; - msgkey.msgid = msgid; - return RB_FIND(msgtree, &ramqueue.msgtree, &msgkey); -} + msgid = evpid_to_msgid(evpid); + if ((message = tree_get(&ramqueue.messages, msgid)) == NULL) + return; + if ((envelope = tree_xget(&message->envelopes, evpid)) == NULL) + return; + if (envelope->flags & RQ_ENVELOPE_INFLIGHT) + return; -static struct ramqueue_envelope * -ramqueue_lookup_offload(u_int64_t evpid) -{ - struct ramqueue_envelope evpkey; + envelope->sched = time(NULL); + TAILQ_REMOVE(envelope->queue, envelope, entry); + sorted_insert(envelope->queue, envelope); +} - evpkey.evpid = evpid; - return RB_FIND(offloadtree, &ramqueue.offloadtree, &evpkey); +static void +scheduler_ramqueue_remove(uint64_t evpid) +{ + struct rq_message *message; + struct rq_envelope *envelope; + uint32_t msgid; + struct evplist rmlist; + void *i; + + if (evpid > 0xffffffff) { + msgid = evpid_to_msgid(evpid); + if ((message = tree_get(&ramqueue.messages, msgid)) == NULL) + return; + if ((envelope = tree_xget(&message->envelopes, evpid)) == NULL) + return; + if (envelope->flags & RQ_ENVELOPE_INFLIGHT) + return; + rq_envelope_delete(&ramqueue, envelope); + tree_xset(&ramqueue.removed, evpid, &ramqueue); + } + else { + msgid = evpid; + if ((message = tree_get(&ramqueue.messages, msgid)) == NULL) + return; + + TAILQ_INIT(&rmlist); + i = NULL; + while (tree_iter(&message->envelopes, &i, &evpid, + (void*)(&envelope))) { + if (envelope->flags & RQ_ENVELOPE_INFLIGHT) + continue; + tree_xset(&ramqueue.removed, evpid, &ramqueue); + TAILQ_REMOVE(envelope->queue, envelope, entry); + envelope->queue = &rmlist; + TAILQ_INSERT_HEAD(&rmlist, envelope, entry); + } + while((envelope = TAILQ_FIRST(&rmlist))) + rq_envelope_delete(&ramqueue, envelope); + } } -static struct ramqueue_envelope * -ramqueue_lookup_envelope(u_int64_t evpid) +static void +sorted_insert(struct evplist *list, struct rq_envelope *evp) { - struct ramqueue_message *rq_msg; - struct ramqueue_envelope evpkey; + struct rq_envelope *item; - rq_msg = ramqueue_lookup_message(evpid_to_msgid(evpid)); - if (rq_msg == NULL) - return NULL; - - evpkey.evpid = evpid; - return RB_FIND(evptree, &rq_msg->evptree, &evpkey); + TAILQ_FOREACH(item, list, entry) { + if (evp->sched < item->sched) { + TAILQ_INSERT_BEFORE(item, evp, entry); + return; + } + } + TAILQ_INSERT_TAIL(list, evp, entry); } -static struct ramqueue_batch * -ramqueue_lookup_batch(struct ramqueue_host *rq_host, u_int32_t msgid) +static void +sorted_merge(struct evplist *list, struct evplist *from) { - struct ramqueue_batch *rq_batch; + struct rq_envelope *e; - TAILQ_FOREACH(rq_batch, &rq_host->batch_queue, batch_entry) { - if (rq_batch->msgid == msgid) - return rq_batch; + /* XXX this is O(not good enough) */ + while ((e = TAILQ_LAST(from, evplist))) { + TAILQ_REMOVE(from, e, entry); + sorted_insert(list, e); + e->queue = list; } - - return NULL; } -static int -ramqueue_expire(struct envelope *envelope) +static void +rq_queue_init(struct rq_queue *rq) { - struct envelope bounce; - struct scheduler_info si; - time_t curtm; - - curtm = time(NULL); - if (curtm - envelope->creation >= envelope->expire) { - envelope_set_errormsg(envelope, - "message expired after sitting in queue for %d days", - envelope->expire / 60 / 60 / 24); - bounce_record_message(envelope, &bounce); - - scheduler_info(&si, &bounce); - scheduler_ramqueue_insert(&si); - - log_debug("#### %s: queue_envelope_delete: %016" PRIx64, - __func__, envelope->id); - queue_envelope_delete(envelope); - return 1; - } - return 0; + bzero(rq, sizeof *rq); + + tree_init(&rq->messages); + tree_init(&rq->expired); + tree_init(&rq->removed); + SPLAY_INIT(&rq->hosts); + TAILQ_INIT(&rq->mda); + TAILQ_INIT(&rq->mta); + TAILQ_INIT(&rq->bounce); + TAILQ_INIT(&rq->inflight); } -static time_t -ramqueue_next_schedule(struct scheduler_info *si, time_t curtm) -{ - time_t delay; - - if (si->lasttry == 0) - return curtm; - - delay = SMTPD_QUEUE_MAXINTERVAL; - - if (si->type == D_MDA || - si->type == D_BOUNCE) { - if (si->retry < 5) - return curtm; - - if (si->retry < 15) - delay = (si->retry * 60) + arc4random_uniform(60); +static void +rq_queue_merge(struct rq_queue *rq, struct rq_queue *update) +{ + struct rq_message *message, *tomessage; + struct rq_batch *batch, *tobatch; + struct rq_host *host, *tohost; + struct rq_envelope *envelope; + uint64_t id; + void *i; + + /* merge host tree */ + while ((host = SPLAY_ROOT(&update->hosts))) { + SPLAY_REMOVE(hosttree, &update->hosts, host); + tohost = rq_host_lookup(&rq->hosts, host->host); + if (tohost == NULL) + tohost = rq_host_create(&rq->hosts, host->host); + /* merge batches for current host */ + while (tree_poproot(&host->batches, &id, (void*)&batch)) { + tobatch = tree_get(&tohost->batches, batch->msgid); + if (tobatch == NULL) { + /* batch does not exist. re-use structure */ + batch->host = tohost; + tree_xset(&tohost->batches, batch->msgid, batch); + continue; + } + /* need to re-link all envelopes before merging them */ + i = NULL; + while((tree_iter(&batch->envelopes, &i, &id, + (void*)&envelope))) + envelope->batch = tobatch; + tree_merge(&tobatch->envelopes, &batch->envelopes); + free(batch); + } + free(host); } - if (si->type == D_MTA) { - if (si->retry < 3) - delay = SMTPD_QUEUE_INTERVAL; - else if (si->retry <= 7) { - delay = SMTPD_QUEUE_INTERVAL * (1 << (si->retry - 3)); - if (delay > SMTPD_QUEUE_MAXINTERVAL) - delay = SMTPD_QUEUE_MAXINTERVAL; + while (tree_poproot(&update->messages, &id, (void*)&message)) { + if ((tomessage = tree_get(&rq->messages, id)) == NULL) { + /* message does not exist. re-use structure */ + tree_xset(&rq->messages, id, message); + continue; } + /* need to re-link all envelopes before merging them */ + i = NULL; + while((tree_iter(&message->envelopes, &i, &id, + (void*)&envelope))) + envelope->message = tomessage; + tree_merge(&tomessage->envelopes, &message->envelopes); + free(message); } - if (curtm >= si->lasttry + delay) - return curtm; + sorted_merge(&rq->bounce, &update->bounce); + sorted_merge(&rq->mda, &update->mda); + sorted_merge(&rq->mta, &update->mta); - return curtm + delay; + tree_merge(&rq->expired, &update->expired); + tree_merge(&rq->removed, &update->removed); } -static struct ramqueue_message * -ramqueue_insert_message(u_int32_t msgid) +static void +rq_envelope_delete(struct rq_queue *rq, struct rq_envelope *envelope) { - struct ramqueue_message *rq_msg; + struct rq_message *message; + struct rq_batch *batch; + struct rq_host *host; - rq_msg = calloc(1, sizeof (*rq_msg)); - if (rq_msg == NULL) - fatal("calloc"); - rq_msg->msgid = msgid; - RB_INSERT(msgtree, &ramqueue.msgtree, rq_msg); - RB_INIT(&rq_msg->evptree); - stat_increment(STATS_RAMQUEUE_MESSAGE); + if (envelope->flags & RQ_ENVELOPE_EXPIRED) + tree_pop(&rq->expired, envelope->evpid); - return rq_msg; -} + TAILQ_REMOVE(envelope->queue, envelope, entry); + batch = envelope->batch; + message = envelope->message; + host = batch->host; -static struct ramqueue_host * -ramqueue_insert_host(char *host) -{ - struct ramqueue_host *rq_host; - - rq_host = calloc(1, sizeof (*rq_host)); - if (rq_host == NULL) - fatal("calloc"); - strlcpy(rq_host->hostname, host, sizeof(rq_host->hostname)); - TAILQ_INIT(&rq_host->batch_queue); - RB_INSERT(hosttree, &ramqueue.hosttree, rq_host); - stat_increment(STATS_RAMQUEUE_HOST); + tree_xpop(&message->envelopes, envelope->evpid); + if (tree_empty(&message->envelopes)) { + tree_xpop(&rq->messages, message->msgid); + free(message); + } - return rq_host; + tree_xpop(&batch->envelopes, envelope->evpid); + if (tree_empty(&batch->envelopes)) { + tree_xpop(&host->batches, message->msgid); + if (tree_empty(&host->batches)) { + SPLAY_REMOVE(hosttree, &rq->hosts, host); + free(host); + } + free(batch); + } + free(envelope); } -static struct ramqueue_batch * -ramqueue_insert_batch(struct ramqueue_host *rq_host, u_int32_t msgid) +static const char * +rq_envelope_to_text(struct rq_envelope *e, time_t tref) { - struct ramqueue_batch *rq_batch; + static char buf[256]; + char t[64]; - rq_batch = calloc(1, sizeof (*rq_batch)); - if (rq_batch == NULL) - fatal("calloc"); - rq_batch->b_id = generate_uid(); - rq_batch->rq_host = rq_host; - rq_batch->msgid = msgid; + snprintf(buf, sizeof buf, "evp:%016" PRIx64 " [", e->evpid); - TAILQ_INIT(&rq_batch->envelope_queue); - TAILQ_INSERT_TAIL(&rq_host->batch_queue, rq_batch, batch_entry); + if (e->type == D_BOUNCE) + strlcat(buf, "bounce", sizeof buf); + else if (e->type == D_MDA) + strlcat(buf, "mda", sizeof buf); + else if (e->type == D_MTA) + strlcat(buf, "mta", sizeof buf); - stat_increment(STATS_RAMQUEUE_BATCH); + snprintf(t, sizeof t, ",sched=%s", duration_to_text(e->sched - tref)); + strlcat(buf, t, sizeof buf); + snprintf(t, sizeof t, ",exp=%s", duration_to_text(e->expire - tref)); + strlcat(buf, t, sizeof buf); - return rq_batch; -} + if (e->flags & RQ_ENVELOPE_EXPIRED) + strlcat(buf, ",expired", sizeof buf); + if (e->flags & RQ_ENVELOPE_INFLIGHT) + strlcat(buf, ",in-flight", sizeof buf); -static void -ramqueue_remove_host(struct ramqueue_host *rq_host) -{ - RB_REMOVE(hosttree, &ramqueue.hosttree, rq_host); - free(rq_host); - stat_decrement(STATS_RAMQUEUE_HOST); + strlcat(buf, "]", sizeof buf); + + return (buf); } static void -ramqueue_remove_message(struct ramqueue_message *rq_msg) -{ - RB_REMOVE(msgtree, &ramqueue.msgtree, rq_msg); - free(rq_msg); - stat_decrement(STATS_RAMQUEUE_MESSAGE); -} +rq_queue_dump(struct rq_queue *rq, const char * name, time_t tref) +{ + struct rq_host *host; + struct rq_batch *batch; + struct rq_message *message; + struct rq_envelope *envelope; + void *i, *j; + uint64_t id; + + log_debug("/--- ramqueue: %s", name); + SPLAY_FOREACH(host, hosttree, &rq->hosts) { + log_debug("| host:%s", host->host); + i = NULL; + while((tree_iter(&host->batches, &i, &id, (void*)&batch))) { + log_debug("| batch:%08" PRIx32, batch->msgid); + j = NULL; + while((tree_iter(&batch->envelopes, &j, &id, + (void*)&envelope))) + log_debug("| %s", + rq_envelope_to_text(envelope, tref)); + } + } + i = NULL; + while((tree_iter(&rq->messages, &i, &id, (void*)&message))) { + log_debug("| msg:%08" PRIx32, message->msgid); + j = NULL; + while((tree_iter(&message->envelopes, &j, &id, + (void*)&envelope))) + log_debug("| %s", rq_envelope_to_text(envelope, tref)); + } -static void -ramqueue_remove_batch(struct ramqueue_host *rq_host, - struct ramqueue_batch *rq_batch) -{ - TAILQ_REMOVE(&rq_host->batch_queue, rq_batch, batch_entry); - free(rq_batch); - stat_decrement(STATS_RAMQUEUE_BATCH); + log_debug("| bounces:"); + TAILQ_FOREACH(envelope, &rq->bounce, entry) + log_debug("| %s", rq_envelope_to_text(envelope, tref)); + log_debug("| mda:"); + TAILQ_FOREACH(envelope, &rq->mda, entry) + log_debug("| %s", rq_envelope_to_text(envelope, tref)); + log_debug("| mta:"); + TAILQ_FOREACH(envelope, &rq->mta, entry) + log_debug("| %s", rq_envelope_to_text(envelope, tref)); + log_debug("| in-flight:"); + TAILQ_FOREACH(envelope, &rq->inflight, entry) + log_debug("| %s", rq_envelope_to_text(envelope, tref)); + log_debug("\\---"); } static int -ramqueue_host_cmp(struct ramqueue_host *h1, struct ramqueue_host *h2) +rq_host_cmp(struct rq_host *a, struct rq_host *b) { - return strcmp(h1->hostname, h2->hostname); + return (strcmp(a->host, b->host)); } - -static int -ramqueue_msg_cmp(struct ramqueue_message *m1, struct ramqueue_message *m2) +static struct rq_host * +rq_host_lookup(struct hosttree *host_tree, char *host) { - return (m1->msgid < m2->msgid ? -1 : m1->msgid > m2->msgid); + struct rq_host key; + + strlcpy(key.host, host, sizeof key.host); + return (SPLAY_FIND(hosttree, host_tree, &key)); } -static int -ramqueue_evp_cmp(struct ramqueue_envelope *e1, struct ramqueue_envelope *e2) +static struct rq_host * +rq_host_create(struct hosttree *host_tree, char *host) { - return (e1->evpid < e2->evpid ? -1 : e1->evpid > e2->evpid); -} + struct rq_host *rq_host; -RB_GENERATE(hosttree, ramqueue_host, hosttree_entry, ramqueue_host_cmp); -RB_GENERATE(msgtree, ramqueue_message, msgtree_entry, ramqueue_msg_cmp); -RB_GENERATE(evptree, ramqueue_envelope, evptree_entry, ramqueue_evp_cmp); -RB_GENERATE(offloadtree, ramqueue_envelope, evptree_entry, ramqueue_evp_cmp); + rq_host = xcalloc(1, sizeof *rq_host, "rq_host_create"); + tree_init(&rq_host->batches); + strlcpy(rq_host->host, host, sizeof rq_host->host); + SPLAY_INSERT(hosttree, host_tree, rq_host); + + return (rq_host); +} +SPLAY_GENERATE(hosttree, rq_host, hosttree_entry, rq_host_cmp); diff --git a/usr.sbin/smtpd/smtpd.h b/usr.sbin/smtpd/smtpd.h index b98cf72f6f9..7941fd73bba 100644 --- a/usr.sbin/smtpd/smtpd.h +++ b/usr.sbin/smtpd/smtpd.h @@ -1,8 +1,9 @@ -/* $OpenBSD: smtpd.h,v 1.314 2012/08/07 21:47:57 eric Exp $ */ +/* $OpenBSD: smtpd.h,v 1.315 2012/08/08 08:50:42 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade * Copyright (c) 2008 Pierre-Yves Ritschard + * Copyright (c) 2012 Eric Faurot * * Permission to use, copy, modify, and distribute this software for any * purpose with or without fee is hereby granted, provided that the above @@ -916,8 +917,8 @@ struct delivery_backend { }; struct scheduler_info { - u_int64_t evpid; - char destination[MAXHOSTNAMELEN]; + u_int64_t evpid; + char destination[MAXHOSTNAMELEN]; enum delivery_type type; time_t creation; @@ -926,30 +927,41 @@ struct scheduler_info { u_int8_t retry; }; +struct id_list { + struct id_list *next; + uint64_t id; +}; + +#define SCHED_NONE 0x00 +#define SCHED_DELAY 0x01 +#define SCHED_REMOVE 0x02 +#define SCHED_EXPIRE 0x04 +#define SCHED_BOUNCE 0x08 +#define SCHED_MDA 0x10 +#define SCHED_MTA 0x20 + +struct scheduler_batch { + int type; + time_t delay; + struct id_list *evpids; +}; + struct scheduler_backend { void (*init)(void); - int (*setup)(void); - - int (*next)(u_int64_t *, time_t *); void (*insert)(struct scheduler_info *); - void (*schedule)(u_int64_t); - void (*remove)(u_int64_t); + void (*commit)(u_int32_t); + void (*rollback)(u_int32_t); - void *(*host)(char *); - void *(*message)(u_int32_t); - void *(*batch)(u_int64_t); - void *(*queue)(void); - void (*close)(void *); + void (*update)(struct scheduler_info *); + void (*delete)(u_int64_t); - int (*fetch)(void *, u_int64_t *); - int (*force)(u_int64_t); + void (*batch)(int, time_t, struct scheduler_batch *); - void (*display)(void); /* may be NULL */ + void (*schedule)(u_int64_t); + void (*remove)(u_int64_t); }; - - extern struct smtpd *env; extern void (*imsg_callback)(struct imsgev *, struct imsg *); @@ -1100,13 +1112,11 @@ void qwalk_close(void *); /* scheduler.c */ pid_t scheduler(void); -void message_reset_flags(struct envelope *); - -/* scheduler.c */ +/* scheduler_bakend.c */ struct scheduler_backend *scheduler_backend_lookup(const char *); void scheduler_info(struct scheduler_info *, struct envelope *); - +time_t scheduler_compute_schedule(struct scheduler_info *); /* smtp.c */ pid_t smtp(void); @@ -1199,6 +1209,7 @@ int valid_localpart(const char *); int valid_domainpart(const char *); char *ss_to_text(struct sockaddr_storage *); char *time_to_text(time_t); +char *duration_to_text(time_t); int secure_file(int, char *, char *, uid_t, int); int lowercase(char *, char *, size_t); void xlowercase(char *, char *, size_t); diff --git a/usr.sbin/smtpd/util.c b/usr.sbin/smtpd/util.c index 7000a366c1d..8fa0beff0c4 100644 --- a/usr.sbin/smtpd/util.c +++ b/usr.sbin/smtpd/util.c @@ -1,4 +1,4 @@ -/* $OpenBSD: util.c,v 1.68 2012/08/07 21:47:58 eric Exp $ */ +/* $OpenBSD: util.c,v 1.69 2012/08/08 08:50:42 eric Exp $ */ /* * Copyright (c) 2000,2001 Markus Friedl. All rights reserved. @@ -538,6 +538,51 @@ time_to_text(time_t when) return buf; } +char * +duration_to_text(time_t t) +{ + static char dst[64]; + char buf[64]; + int d, h, m, s; + + if (t == 0) { + strlcpy(dst, "0s", sizeof dst); + return (dst); + } + + dst[0] = '\0'; + if (t < 0) { + strlcpy(dst, "-", sizeof dst); + t = -t; + } + + s = t % 60; + t /= 60; + m = t % 60; + t /= 60; + h = t % 24; + d = t / 24; + + if (d) { + snprintf(buf, sizeof buf, "%id", d); + strlcat(dst, buf, sizeof dst); + } + if (h) { + snprintf(buf, sizeof buf, "%ih", h); + strlcat(dst, buf, sizeof dst); + } + if (m) { + snprintf(buf, sizeof buf, "%im", m); + strlcat(dst, buf, sizeof dst); + } + if (s) { + snprintf(buf, sizeof buf, "%is", s); + strlcat(dst, buf, sizeof dst); + } + + return (dst); +} + int text_to_netaddr(struct netaddr *netaddr, char *s) { -- cgit v1.2.3