diff options
author | Eric Faurot <eric@cvs.openbsd.org> | 2013-10-27 17:47:54 +0000 |
---|---|---|
committer | Eric Faurot <eric@cvs.openbsd.org> | 2013-10-27 17:47:54 +0000 |
commit | 2299126780a32cd17905cda2365073d145ee0442 (patch) | |
tree | 170c5c07573663d85e4b9463d686a31e004d83b6 /usr.sbin | |
parent | e8227a9080380ea759f6cd0c08263e560feaa0e2 (diff) |
Implement a feedback mechanism which allows the mta to "hold" envelopes
in the scheduler when it has too many tasks for a given relay. The
envelopes are put on a wait queue, and are not scheduled again until
the mta "releases" some envelopes from that queue.
It prevents from having too many inflight envelopes, which are out of reach
for the admin.
Diffstat (limited to 'usr.sbin')
-rw-r--r-- | usr.sbin/smtpd/limit.c | 14 | ||||
-rw-r--r-- | usr.sbin/smtpd/mta.c | 59 | ||||
-rw-r--r-- | usr.sbin/smtpd/queue.c | 4 | ||||
-rw-r--r-- | usr.sbin/smtpd/scheduler.c | 28 | ||||
-rw-r--r-- | usr.sbin/smtpd/scheduler_api.c | 40 | ||||
-rw-r--r-- | usr.sbin/smtpd/scheduler_backend.c | 2 | ||||
-rw-r--r-- | usr.sbin/smtpd/scheduler_null.c | 18 | ||||
-rw-r--r-- | usr.sbin/smtpd/scheduler_proc.c | 56 | ||||
-rw-r--r-- | usr.sbin/smtpd/scheduler_ramqueue.c | 157 | ||||
-rw-r--r-- | usr.sbin/smtpd/smtpctl.c | 3 | ||||
-rw-r--r-- | usr.sbin/smtpd/smtpd-api.h | 7 | ||||
-rw-r--r-- | usr.sbin/smtpd/smtpd.c | 4 | ||||
-rw-r--r-- | usr.sbin/smtpd/smtpd.h | 15 |
13 files changed, 384 insertions, 23 deletions
diff --git a/usr.sbin/smtpd/limit.c b/usr.sbin/smtpd/limit.c index bdacef93d3f..ff5756813af 100644 --- a/usr.sbin/smtpd/limit.c +++ b/usr.sbin/smtpd/limit.c @@ -1,4 +1,4 @@ -/* $OpenBSD: limit.c,v 1.1 2013/07/19 21:14:52 eric Exp $ */ +/* $OpenBSD: limit.c,v 1.2 2013/10/27 17:47:53 eric Exp $ */ /* * Copyright (c) 2013 Eric Faurot <eric@openbsd.org> @@ -57,6 +57,10 @@ limit_mta_set_defaults(struct mta_limits *limits) limits->sessdelay_keepalive = 10; limits->family = AF_UNSPEC; + + limits->task_hiwat = 50; + limits->task_lowat = 30; + limits->task_release = 10; } int @@ -97,6 +101,14 @@ limit_mta_set(struct mta_limits *limits, const char *key, int64_t value) limits->sessdelay_transaction = value; else if (!strcmp(key, "session-keepalive")) limits->sessdelay_keepalive = value; + + else if (!strcmp(key, "task-hiwat")) + limits->task_hiwat = value; + else if (!strcmp(key, "task-lowat")) + limits->task_lowat = value; + else if (!strcmp(key, "task-release")) + limits->task_release = value; + else return (0); diff --git a/usr.sbin/smtpd/mta.c b/usr.sbin/smtpd/mta.c index 2eeb23e6a08..e244fe41bc1 100644 --- a/usr.sbin/smtpd/mta.c +++ b/usr.sbin/smtpd/mta.c @@ -1,4 +1,4 @@ -/* $OpenBSD: mta.c,v 1.165 2013/10/27 07:56:25 eric Exp $ */ +/* $OpenBSD: mta.c,v 1.166 2013/10/27 17:47:53 eric Exp $ */ /* * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> @@ -53,6 +53,8 @@ #define DELAY_ROUTE_BASE 200 #define DELAY_ROUTE_MAX (3600 * 4) +#define RELAY_ONHOLD 0x01 + static void mta_imsg(struct mproc *, struct imsg *); static void mta_shutdown(void); static void mta_sig_handler(int, short, void *); @@ -186,6 +188,28 @@ mta_imsg(struct mproc *p, struct imsg *imsg) m_end(&m); relay = mta_relay(&evp); + /* ignore if we don't know the limits yet */ + if (relay->limits && + relay->ntask >= (size_t)relay->limits->task_hiwat) { + if (!(relay->state & RELAY_ONHOLD)) { + log_info("smtp-out: hiwat reached on %s: holding envelopes", + mta_relay_to_text(relay)); + relay->state |= RELAY_ONHOLD; + } + } + + /* + * If the relay has too many pending tasks, tell the + * scheduler to hold it until further notice + */ + if (relay->state & RELAY_ONHOLD) { + m_create(p_queue, IMSG_DELIVERY_HOLD, 0, 0, -1); + m_add_evpid(p_queue, evp.id); + m_add_id(p_queue, relay->id); + m_close(p_queue); + mta_relay_unref(relay); + return; + } TAILQ_FOREACH(task, &relay->tasks, entry) if (task->msgid == evpid_to_msgid(evp.id)) @@ -606,6 +630,25 @@ mta_route_next_task(struct mta_relay *relay, struct mta_route *route) TAILQ_REMOVE(&relay->tasks, task, entry); relay->ntask -= 1; task->relay = NULL; + + /* When the number of tasks is down to lowat, query some evp */ + if (relay->ntask == (size_t)relay->limits->task_lowat) { + if (relay->state & RELAY_ONHOLD) { + log_info("smtp-out: back to lowat on %s: releasing", + mta_relay_to_text(relay)); + relay->state &= ~RELAY_ONHOLD; + } + m_create(p_queue, IMSG_DELIVERY_RELEASE, 0, 0, -1); + m_add_id(p_queue, relay->id); + m_add_int(p_queue, relay->limits->task_release); + m_close(p_queue); + } + else if (relay->ntask == 0) { + m_create(p_queue, IMSG_DELIVERY_RELEASE, 0, 0, -1); + m_add_id(p_queue, relay->id); + m_add_int(p_queue, 0); + m_close(p_queue); + } } return (task); @@ -1115,7 +1158,7 @@ mta_route_enable(struct mta_route *route) route->flags &= ~ROUTE_DISABLED; route->flags |= ROUTE_NEW; } - + if (route->penalty) { #if DELAY_QUADRATIC route->penalty -= 1; @@ -1255,6 +1298,12 @@ mta_flush(struct mta_relay *relay, int fail, const char *error) stat_decrement("mta.task", relay->ntask); stat_decrement("mta.envelope", n); relay->ntask = 0; + + /* release all waiting envelopes for the relay */ + m_create(p_queue, IMSG_DELIVERY_RELEASE, 0, 0, -1); + m_add_id(p_queue, relay->id); + m_add_int(p_queue, 0); + m_close(p_queue); } /* @@ -1547,6 +1596,12 @@ mta_relay_unref(struct mta_relay *relay) if (--relay->refcount) return; + /* Make sure they are no envelopes held for this relay */ + m_create(p_queue, IMSG_DELIVERY_RELEASE, 0, 0, -1); + m_add_id(p_queue, relay->id); + m_add_int(p_queue, 0); + m_close(p_queue); + log_debug("debug: mta: freeing %s", mta_relay_to_text(relay)); SPLAY_REMOVE(mta_relay_tree, &relays, relay); diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c index d8e5e3679d8..bba07281584 100644 --- a/usr.sbin/smtpd/queue.c +++ b/usr.sbin/smtpd/queue.c @@ -1,4 +1,4 @@ -/* $OpenBSD: queue.c,v 1.154 2013/10/27 07:56:25 eric Exp $ */ +/* $OpenBSD: queue.c,v 1.155 2013/10/27 17:47:53 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org> @@ -414,6 +414,8 @@ queue_imsg(struct mproc *p, struct imsg *imsg) m_close(p_scheduler); return; + case IMSG_DELIVERY_HOLD: + case IMSG_DELIVERY_RELEASE: case IMSG_MTA_SCHEDULE: m_forward(p_scheduler, imsg); return; diff --git a/usr.sbin/smtpd/scheduler.c b/usr.sbin/smtpd/scheduler.c index 32ec639690b..7971ae3be24 100644 --- a/usr.sbin/smtpd/scheduler.c +++ b/usr.sbin/smtpd/scheduler.c @@ -1,4 +1,4 @@ -/* $OpenBSD: scheduler.c,v 1.34 2013/10/27 07:56:25 eric Exp $ */ +/* $OpenBSD: scheduler.c,v 1.35 2013/10/27 17:47:53 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org> @@ -73,7 +73,7 @@ scheduler_imsg(struct mproc *p, struct imsg *imsg) struct envelope evp; struct scheduler_info si; struct msg m; - uint64_t evpid, id; + uint64_t evpid, id, holdq; uint32_t msgid, msgids[MSGBATCHSIZE]; uint32_t inflight; uint32_t penalty; @@ -204,6 +204,30 @@ scheduler_imsg(struct mproc *p, struct imsg *imsg) scheduler_reset_events(); return; + case IMSG_DELIVERY_HOLD: + m_msg(&m, imsg); + m_get_evpid(&m, &evpid); + m_get_id(&m, &holdq); + m_end(&m); + log_trace(TRACE_SCHEDULER, + "scheduler: holding evp:%016" PRIx64 " on %016" PRIx64, + evpid, holdq); + backend->hold(evpid, holdq); + stat_decrement("scheduler.envelope.inflight", 1); + scheduler_reset_events(); + return; + + case IMSG_DELIVERY_RELEASE: + m_msg(&m, imsg); + m_get_id(&m, &holdq); + m_get_int(&m, &r); + m_end(&m); + log_trace(TRACE_SCHEDULER, + "scheduler: releasing %d on holdq %016" PRIx64, r, holdq); + backend->release(holdq, r); + scheduler_reset_events(); + return; + case IMSG_CTL_PAUSE_MDA: log_trace(TRACE_SCHEDULER, "scheduler: pausing mda"); env->sc_flags |= SMTPD_MDA_PAUSED; diff --git a/usr.sbin/smtpd/scheduler_api.c b/usr.sbin/smtpd/scheduler_api.c index 87300433853..de382075947 100644 --- a/usr.sbin/smtpd/scheduler_api.c +++ b/usr.sbin/smtpd/scheduler_api.c @@ -1,4 +1,4 @@ -/* $OpenBSD: scheduler_api.c,v 1.2 2013/10/26 12:27:59 eric Exp $ */ +/* $OpenBSD: scheduler_api.c,v 1.3 2013/10/27 17:47:53 eric Exp $ */ /* * Copyright (c) 2013 Eric Faurot <eric@openbsd.org> @@ -35,6 +35,8 @@ static size_t (*handler_commit)(uint32_t); static size_t (*handler_rollback)(uint32_t); static int (*handler_update)(struct scheduler_info *); static int (*handler_delete)(uint64_t); +static int (*handler_hold)(uint64_t, uint64_t); +static int (*handler_release)(uint64_t, int); static int (*handler_batch)(int, struct scheduler_batch *); static size_t (*handler_messages)(uint32_t, uint32_t *, size_t); static size_t (*handler_envelopes)(uint64_t, struct evpstate *, size_t); @@ -108,7 +110,7 @@ scheduler_msg_dispatch(void) { size_t n, sz; struct evpstate evpstates[MAX_BATCH_SIZE]; - uint64_t evpid, evpids[MAX_BATCH_SIZE]; + uint64_t evpid, evpids[MAX_BATCH_SIZE], u64; uint32_t msgids[MAX_BATCH_SIZE], version, msgid; struct scheduler_info info; struct scheduler_batch batch; @@ -183,6 +185,28 @@ scheduler_msg_dispatch(void) imsg_compose(&ibuf, PROC_SCHEDULER_OK, 0, 0, -1, &r, sizeof(r)); break; + case PROC_SCHEDULER_HOLD: + log_debug("scheduler-api: PROC_SCHEDULER_HOLD"); + scheduler_msg_get(&evpid, sizeof(evpid)); + scheduler_msg_get(&u64, sizeof(u64)); + scheduler_msg_end(); + + r = handler_hold(evpid, u64); + + imsg_compose(&ibuf, PROC_SCHEDULER_OK, 0, 0, -1, &r, sizeof(r)); + break; + + case PROC_SCHEDULER_RELEASE: + log_debug("scheduler-api: PROC_SCHEDULER_RELEASE"); + scheduler_msg_get(&u64, sizeof(u64)); + scheduler_msg_get(&r, sizeof(r)); + scheduler_msg_end(); + + r = handler_release(u64, r); + + imsg_compose(&ibuf, PROC_SCHEDULER_OK, 0, 0, -1, &r, sizeof(r)); + break; + case PROC_SCHEDULER_BATCH: log_debug("scheduler-api: PROC_SCHEDULER_BATCH"); scheduler_msg_get(&typemask, sizeof(typemask)); @@ -354,6 +378,18 @@ scheduler_api_on_resume(int(*cb)(uint64_t)) handler_resume = cb; } +void +scheduler_api_on_hold(int(*cb)(uint64_t, uint64_t)) +{ + handler_hold = cb; +} + +void +scheduler_api_on_release(int(*cb)(uint64_t, int)) +{ + handler_release = cb; +} + int scheduler_api_dispatch(void) { diff --git a/usr.sbin/smtpd/scheduler_backend.c b/usr.sbin/smtpd/scheduler_backend.c index 8df4d3693a3..80114990a46 100644 --- a/usr.sbin/smtpd/scheduler_backend.c +++ b/usr.sbin/smtpd/scheduler_backend.c @@ -1,4 +1,4 @@ -/* $OpenBSD: scheduler_backend.c,v 1.11 2013/07/19 21:34:31 eric Exp $ */ +/* $OpenBSD: scheduler_backend.c,v 1.12 2013/10/27 17:47:53 eric Exp $ */ /* * Copyright (c) 2012 Gilles Chehade <gilles@poolp.org> diff --git a/usr.sbin/smtpd/scheduler_null.c b/usr.sbin/smtpd/scheduler_null.c index 020f05869b6..8a4da95c765 100644 --- a/usr.sbin/smtpd/scheduler_null.c +++ b/usr.sbin/smtpd/scheduler_null.c @@ -1,4 +1,4 @@ -/* $OpenBSD: scheduler_null.c,v 1.4 2013/07/19 21:34:31 eric Exp $ */ +/* $OpenBSD: scheduler_null.c,v 1.5 2013/10/27 17:47:53 eric Exp $ */ /* * Copyright (c) 2012 Eric Faurot <eric@openbsd.org> @@ -36,6 +36,8 @@ static size_t scheduler_null_commit(uint32_t); static size_t scheduler_null_rollback(uint32_t); static int scheduler_null_update(struct scheduler_info *); static int scheduler_null_delete(uint64_t); +static int scheduler_null_hold(uint64_t, uint64_t); +static int scheduler_null_release(uint64_t, int); static int scheduler_null_batch(int, struct scheduler_batch *); static size_t scheduler_null_messages(uint32_t, uint32_t *, size_t); static size_t scheduler_null_envelopes(uint64_t, struct evpstate *, size_t); @@ -53,6 +55,8 @@ struct scheduler_backend scheduler_backend_null = { scheduler_null_update, scheduler_null_delete, + scheduler_null_hold, + scheduler_null_release, scheduler_null_batch, @@ -101,6 +105,18 @@ scheduler_null_delete(uint64_t evpid) } static int +scheduler_null_hold(uint64_t evpid, uint64_t holdq) +{ + return (0); +} + +static int +scheduler_null_release(uint64_t holdq, int n) +{ + return (0); +} + +static int scheduler_null_batch(int typemask, struct scheduler_batch *ret) { ret->type = SCHED_NONE; diff --git a/usr.sbin/smtpd/scheduler_proc.c b/usr.sbin/smtpd/scheduler_proc.c index dd962d93289..0e927a6aca9 100644 --- a/usr.sbin/smtpd/scheduler_proc.c +++ b/usr.sbin/smtpd/scheduler_proc.c @@ -1,4 +1,4 @@ -/* $OpenBSD: scheduler_proc.c,v 1.1 2013/07/19 21:34:31 eric Exp $ */ +/* $OpenBSD: scheduler_proc.c,v 1.2 2013/10/27 17:47:53 eric Exp $ */ /* * Copyright (c) 2013 Eric Faurot <eric@openbsd.org> @@ -247,6 +247,58 @@ scheduler_proc_delete(uint64_t evpid) } static int +scheduler_proc_hold(uint64_t evpid, uint64_t holdq) +{ + struct ibuf *buf; + int r; + + log_debug("debug: scheduler-proc: PROC_SCHEDULER_HOLD"); + + buf = imsg_create(&ibuf, PROC_SCHEDULER_HOLD, 0, 0, + sizeof(evpid) + sizeof(holdq)); + if (buf == NULL) + return (-1); + if (imsg_add(buf, &evpid, sizeof(evpid)) == -1) + return (-1); + if (imsg_add(buf, &holdq, sizeof(holdq)) == -1) + return (-1); + imsg_close(&ibuf, buf); + + scheduler_proc_call(); + + scheduler_proc_read(&r, sizeof(r)); + scheduler_proc_end(); + + return (r); +} + +static int +scheduler_proc_release(uint64_t holdq, int n) +{ + struct ibuf *buf; + int r; + + log_debug("debug: scheduler-proc: PROC_SCHEDULER_RELEASE"); + + buf = imsg_create(&ibuf, PROC_SCHEDULER_RELEASE, 0, 0, + sizeof(holdq) + sizeof(n)); + if (buf == NULL) + return (-1); + if (imsg_add(buf, &holdq, sizeof(holdq)) == -1) + return (-1); + if (imsg_add(buf, &n, sizeof(n)) == -1) + return (-1); + imsg_close(&ibuf, buf); + + scheduler_proc_call(); + + scheduler_proc_read(&r, sizeof(r)); + scheduler_proc_end(); + + return (r); +} + +static int scheduler_proc_batch(int typemask, struct scheduler_batch *ret) { struct ibuf *buf; @@ -413,6 +465,8 @@ struct scheduler_backend scheduler_backend_proc = { scheduler_proc_rollback, scheduler_proc_update, scheduler_proc_delete, + scheduler_proc_hold, + scheduler_proc_release, scheduler_proc_batch, scheduler_proc_messages, scheduler_proc_envelopes, diff --git a/usr.sbin/smtpd/scheduler_ramqueue.c b/usr.sbin/smtpd/scheduler_ramqueue.c index e26b91831c6..f28471700b1 100644 --- a/usr.sbin/smtpd/scheduler_ramqueue.c +++ b/usr.sbin/smtpd/scheduler_ramqueue.c @@ -1,8 +1,8 @@ -/* $OpenBSD: scheduler_ramqueue.c,v 1.31 2013/10/25 19:18:43 eric Exp $ */ +/* $OpenBSD: scheduler_ramqueue.c,v 1.32 2013/10/27 17:47:53 eric Exp $ */ /* * Copyright (c) 2012 Gilles Chehade <gilles@poolp.org> - * Copyright (c) 2012-2013 Eric Faurot <eric@openbsd.org> + * Copyright (c) 2012 Eric Faurot <eric@openbsd.org> * * Permission to use, copy, modify, and distribute this software for any * purpose with or without fee is hereby granted, provided that the above @@ -47,11 +47,13 @@ struct rq_envelope { TAILQ_ENTRY(rq_envelope) entry; uint64_t evpid; + uint64_t holdq; enum delivery_type type; #define RQ_EVPSTATE_PENDING 0 #define RQ_EVPSTATE_SCHEDULED 1 #define RQ_EVPSTATE_INFLIGHT 2 +#define RQ_EVPSTATE_HELD 3 uint8_t state; #define RQ_ENVELOPE_EXPIRED 0x01 @@ -68,6 +70,10 @@ struct rq_envelope { time_t t_scheduled; }; +struct rq_holdq { + struct evplist q; +}; + struct rq_queue { size_t evpcount; struct tree messages; @@ -88,6 +94,8 @@ static size_t scheduler_ram_commit(uint32_t); static size_t scheduler_ram_rollback(uint32_t); static int scheduler_ram_update(struct scheduler_info *); static int scheduler_ram_delete(uint64_t); +static int scheduler_ram_hold(uint64_t, uint64_t); +static int scheduler_ram_release(uint64_t, int); static int scheduler_ram_batch(int, struct scheduler_batch *); static size_t scheduler_ram_messages(uint32_t, uint32_t *, size_t); static size_t scheduler_ram_envelopes(uint64_t, struct evpstate *, size_t); @@ -120,6 +128,8 @@ struct scheduler_backend scheduler_backend_ramqueue = { scheduler_ram_update, scheduler_ram_delete, + scheduler_ram_hold, + scheduler_ram_release, scheduler_ram_batch, @@ -133,6 +143,7 @@ struct scheduler_backend scheduler_backend_ramqueue = { static struct rq_queue ramqueue; static struct tree updates; +static struct tree holdqs; static time_t currtime; @@ -141,6 +152,7 @@ scheduler_ram_init(void) { rq_queue_init(&ramqueue); tree_init(&updates); + tree_init(&holdqs); return (1); } @@ -312,6 +324,91 @@ scheduler_ram_delete(uint64_t evpid) } static int +scheduler_ram_hold(uint64_t evpid, uint64_t holdq) +{ + struct rq_holdq *hq; + struct rq_message *msg; + struct rq_envelope *evp; + uint32_t msgid; + + currtime = time(NULL); + + msgid = evpid_to_msgid(evpid); + msg = tree_xget(&ramqueue.messages, msgid); + evp = tree_xget(&msg->envelopes, evpid); + + /* it *must* be in-flight */ + if (evp->state != RQ_EVPSTATE_INFLIGHT) + errx(1, "evp:%016" PRIx64 " not in-flight", evpid); + + TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry); + + /* If the envelope is suspended, just mark it as pending */ + if (evp->flags & RQ_ENVELOPE_SUSPEND) { + evp->state = RQ_EVPSTATE_PENDING; + return (0); + } + + hq = tree_get(&holdqs, holdq); + if (hq == NULL) { + hq = xcalloc(1, sizeof(*hq), "scheduler_hold"); + TAILQ_INIT(&hq->q); + tree_xset(&holdqs, holdq, hq); + } + + evp->state = RQ_EVPSTATE_HELD; + evp->holdq = holdq; + /* This is an optimization: upon release, the envelopes will be + * inserted in the pending queue from the first element to the last. + * Since elements already in the queue were received first, they + * were scheduled first, so they will be reinserted before the + * current element. + */ + TAILQ_INSERT_HEAD(&hq->q, evp, entry); + stat_increment("scheduler.ramqueue.hold", 1); + + return (1); +} + +static int +scheduler_ram_release(uint64_t holdq, int n) +{ + struct rq_holdq *hq; + struct rq_envelope *evp; + int i; + + currtime = time(NULL); + + hq = tree_get(&holdqs, holdq); + if (hq == NULL) + return (0); + + for (i = 0; n == 0 || i < n; i++) { + evp = TAILQ_FIRST(&hq->q); + if (evp == NULL) + break; + + TAILQ_REMOVE(&hq->q, evp, entry); + evp->holdq = 0; + + /* When released, all envelopes are put in the pending queue + * and will be rescheduled immediatly. As an optimization, + * we could just schedule them directly. + */ + evp->state = RQ_EVPSTATE_PENDING; + sorted_insert(&ramqueue.q_pending, evp); + } + + if (TAILQ_EMPTY(&hq->q)) { + tree_xpop(&holdqs, holdq); + free(hq); + } + stat_decrement("scheduler.ramqueue.hold", i); + + return (i); +} + +static int scheduler_ram_batch(int typemask, struct scheduler_batch *ret) { struct evplist *q; @@ -436,6 +533,12 @@ scheduler_ram_envelopes(uint64_t from, struct evpstate *dst, size_t size) dst[n].time = evp->t_inflight; dst[n].flags = EF_INFLIGHT; } + else if (evp->state == RQ_EVPSTATE_HELD) { + /* same as scheduled */ + dst[n].time = evp->t_scheduled; + dst[n].flags = EF_PENDING; + dst[n].flags |= EF_HOLD; + } if (evp->flags & RQ_ENVELOPE_SUSPEND) dst[n].flags |= EF_SUSPEND; @@ -702,6 +805,9 @@ rq_envelope_list(struct rq_queue *rq, struct rq_envelope *evp) case RQ_EVPSTATE_INFLIGHT: return &rq->q_inflight; + + case RQ_EVPSTATE_HELD: + return (NULL); } errx(1, "%016" PRIx64 " bad state %d", evp->evpid, evp->state); @@ -711,6 +817,7 @@ rq_envelope_list(struct rq_queue *rq, struct rq_envelope *evp) static void rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp) { + struct rq_holdq *hq; struct evplist *q = NULL; switch (evp->type) { @@ -725,7 +832,17 @@ rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp) break; } - if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) + if (evp->state == RQ_EVPSTATE_HELD) { + hq = tree_xget(&holdqs, evp->holdq); + TAILQ_REMOVE(&hq->q, evp, entry); + if (TAILQ_EMPTY(&hq->q)) { + tree_xpop(&holdqs, evp->holdq); + free(hq); + } + evp->holdq = 0; + stat_decrement("scheduler.ramqueue.hold", 1); + } + else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) TAILQ_REMOVE(&rq->q_pending, evp, entry); TAILQ_INSERT_TAIL(q, evp, entry); @@ -736,6 +853,8 @@ rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp) static int rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp) { + struct rq_holdq *hq; + if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED)) return (0); /* @@ -746,7 +865,17 @@ rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp) return (1); } - if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) { + if (evp->state == RQ_EVPSTATE_HELD) { + hq = tree_xget(&holdqs, evp->holdq); + TAILQ_REMOVE(&hq->q, evp, entry); + if (TAILQ_EMPTY(&hq->q)) { + tree_xpop(&holdqs, evp->holdq); + free(hq); + } + evp->holdq = 0; + stat_decrement("scheduler.ramqueue.hold", 1); + } + else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) { TAILQ_REMOVE(rq_envelope_list(rq, evp), evp, entry); } @@ -761,10 +890,23 @@ rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp) static int rq_envelope_suspend(struct rq_queue *rq, struct rq_envelope *evp) { + struct rq_holdq *hq; + if (evp->flags & RQ_ENVELOPE_SUSPEND) return (0); - if (evp->state != RQ_EVPSTATE_INFLIGHT) { + if (evp->state == RQ_EVPSTATE_HELD) { + hq = tree_xget(&holdqs, evp->holdq); + TAILQ_REMOVE(&hq->q, evp, entry); + if (TAILQ_EMPTY(&hq->q)) { + tree_xpop(&holdqs, evp->holdq); + free(hq); + } + evp->holdq = 0; + evp->state = RQ_EVPSTATE_PENDING; + stat_decrement("scheduler.ramqueue.hold", 1); + } + else if (evp->state != RQ_EVPSTATE_INFLIGHT) { TAILQ_REMOVE(rq_envelope_list(rq, evp), evp, entry); } @@ -841,6 +983,11 @@ rq_envelope_to_text(struct rq_envelope *e) strlcat(buf, t, sizeof buf); break; + case RQ_EVPSTATE_HELD: + snprintf(t, sizeof t, ",held=%s", + duration_to_text(currtime - e->t_inflight)); + strlcat(buf, t, sizeof buf); + break; default: errx(1, "%016" PRIx64 " bad state %d", e->evpid, e->state); } diff --git a/usr.sbin/smtpd/smtpctl.c b/usr.sbin/smtpd/smtpctl.c index 757bb5650a9..51c91aeb083 100644 --- a/usr.sbin/smtpd/smtpctl.c +++ b/usr.sbin/smtpd/smtpctl.c @@ -1,4 +1,4 @@ -/* $OpenBSD: smtpctl.c,v 1.110 2013/10/25 21:31:23 eric Exp $ */ +/* $OpenBSD: smtpctl.c,v 1.111 2013/10/27 17:47:53 eric Exp $ */ /* * Copyright (c) 2013 Eric Faurot <eric@openbsd.org> @@ -857,6 +857,7 @@ show_queue_envelope(struct envelope *e, int online) getflag(&e->flags, EF_AUTHENTICATED, "auth", status, sizeof(status)); getflag(&e->flags, EF_INTERNAL, "internal", status, sizeof(status)); getflag(&e->flags, EF_SUSPEND, "suspend", status, sizeof(status)); + getflag(&e->flags, EF_HOLD, "hold", status, sizeof(status)); if (online) { if (e->flags & EF_PENDING) diff --git a/usr.sbin/smtpd/smtpd-api.h b/usr.sbin/smtpd/smtpd-api.h index b8d3f07454a..46eeaa01868 100644 --- a/usr.sbin/smtpd/smtpd-api.h +++ b/usr.sbin/smtpd/smtpd-api.h @@ -1,4 +1,4 @@ -/* $OpenBSD: smtpd-api.h,v 1.9 2013/10/27 11:01:47 eric Exp $ */ +/* $OpenBSD: smtpd-api.h,v 1.10 2013/10/27 17:47:53 eric Exp $ */ /* * Copyright (c) 2013 Eric Faurot <eric@openbsd.org> @@ -118,6 +118,8 @@ enum { PROC_SCHEDULER_ROLLBACK, PROC_SCHEDULER_UPDATE, PROC_SCHEDULER_DELETE, + PROC_SCHEDULER_HOLD, + PROC_SCHEDULER_RELEASE, PROC_SCHEDULER_BATCH, PROC_SCHEDULER_MESSAGES, PROC_SCHEDULER_ENVELOPES, @@ -137,6 +139,7 @@ enum envelope_flags { EF_PENDING = 0x10, EF_INFLIGHT = 0x20, EF_SUSPEND = 0x40, + EF_HOLD = 0x80, }; struct evpstate { @@ -277,6 +280,8 @@ void scheduler_api_on_commit(size_t(*)(uint32_t)); void scheduler_api_on_rollback(size_t(*)(uint32_t)); void scheduler_api_on_update(int(*)(struct scheduler_info *)); void scheduler_api_on_delete(int(*)(uint64_t)); +void scheduler_api_on_hold(int(*)(uint64_t, uint64_t)); +void scheduler_api_on_release(int(*)(uint64_t, int)); void scheduler_api_on_batch(int(*)(int, struct scheduler_batch *)); void scheduler_api_on_messages(size_t(*)(uint32_t, uint32_t *, size_t)); void scheduler_api_on_envelopes(size_t(*)(uint64_t, struct evpstate *, size_t)); diff --git a/usr.sbin/smtpd/smtpd.c b/usr.sbin/smtpd/smtpd.c index d61d62ac864..620e14f6e09 100644 --- a/usr.sbin/smtpd/smtpd.c +++ b/usr.sbin/smtpd/smtpd.c @@ -1,4 +1,4 @@ -/* $OpenBSD: smtpd.c,v 1.201 2013/10/27 11:01:47 eric Exp $ */ +/* $OpenBSD: smtpd.c,v 1.202 2013/10/27 17:47:53 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org> @@ -1491,6 +1491,8 @@ imsg_to_str(int type) CASE(IMSG_DELIVERY_TEMPFAIL); CASE(IMSG_DELIVERY_PERMFAIL); CASE(IMSG_DELIVERY_LOOP); + CASE(IMSG_DELIVERY_HOLD); + CASE(IMSG_DELIVERY_RELEASE); CASE(IMSG_BOUNCE_INJECT); diff --git a/usr.sbin/smtpd/smtpd.h b/usr.sbin/smtpd/smtpd.h index d3ac351c284..e48e756a141 100644 --- a/usr.sbin/smtpd/smtpd.h +++ b/usr.sbin/smtpd/smtpd.h @@ -1,4 +1,4 @@ -/* $OpenBSD: smtpd.h,v 1.427 2013/10/27 11:01:47 eric Exp $ */ +/* $OpenBSD: smtpd.h,v 1.428 2013/10/27 17:47:53 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org> @@ -145,12 +145,11 @@ union lookup { struct addrname addrname; }; -/* XXX */ /* * Bump IMSG_VERSION whenever a change is made to enum imsg_type. * This will ensure that we can never use a wrong version of smtpctl with smtpd. */ -#define IMSG_VERSION 5 +#define IMSG_VERSION 6 enum imsg_type { IMSG_NONE, @@ -210,6 +209,8 @@ enum imsg_type { IMSG_DELIVERY_TEMPFAIL, IMSG_DELIVERY_PERMFAIL, IMSG_DELIVERY_LOOP, + IMSG_DELIVERY_HOLD, + IMSG_DELIVERY_RELEASE, IMSG_BOUNCE_INJECT, @@ -226,7 +227,6 @@ enum imsg_type { IMSG_MFA_EVENT_COMMIT, IMSG_MFA_EVENT_ROLLBACK, IMSG_MFA_EVENT_DISCONNECT, - IMSG_MFA_SMTP_DATA, IMSG_MFA_SMTP_RESPONSE, IMSG_MTA_TRANSFER, @@ -677,6 +677,10 @@ struct mta_limits { time_t sessdelay_keepalive; int family; + + int task_hiwat; + int task_lowat; + int task_release; }; struct mta_relay { @@ -697,6 +701,7 @@ struct mta_relay { char *heloname; char *secret; + int state; size_t ntask; TAILQ_HEAD(, mta_task) tasks; @@ -781,6 +786,8 @@ struct scheduler_backend { int (*update)(struct scheduler_info *); int (*delete)(uint64_t); + int (*hold)(uint64_t, uint64_t); + int (*release)(uint64_t, int); int (*batch)(int, struct scheduler_batch *); |