diff options
author | Eric Faurot <eric@cvs.openbsd.org> | 2012-08-08 08:50:43 +0000 |
---|---|---|
committer | Eric Faurot <eric@cvs.openbsd.org> | 2012-08-08 08:50:43 +0000 |
commit | 4fb538ff7fe692fc3494a906f82178b14bf48bf1 (patch) | |
tree | 74c2a8e0bb894841c747e6bee85be58c05869135 /usr.sbin/smtpd/queue.c | |
parent | e653821d1952b330839a2b474720109458d1d375 (diff) |
Improve the scheduler backend API.
New envelopes are pushed into the scheduler through the insert()
commit() rollback() transactional interface functions.
Worklists are pulled from the scheduler through a single batch()
interface function, which returns a list of envelope ids and the
type of processing. Envelopes returned in this batch are said to
be "in-flight", as opposed to "pending". They are supposed to be
processed in some way, and either updated() or deleted() at some
point.
The schedule()/remove() functions are used to alter the internal
state of "pending" envelopes to make them schedulable. The enve-
lopes will be part of a worklist on the next call to batch().
Rewrite the scheduler_ramqueue backend.
The initial queue loading in now done by the queue.
ok gilles@
Diffstat (limited to 'usr.sbin/smtpd/queue.c')
-rw-r--r-- | usr.sbin/smtpd/queue.c | 58 |
1 files changed, 56 insertions, 2 deletions
diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c index d0e9549607b..c8c2d31403c 100644 --- a/usr.sbin/smtpd/queue.c +++ b/usr.sbin/smtpd/queue.c @@ -1,8 +1,9 @@ -/* $OpenBSD: queue.c,v 1.121 2012/07/09 09:57:53 gilles Exp $ */ +/* $OpenBSD: queue.c,v 1.122 2012/08/08 08:50:42 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> + * Copyright (c) 2012 Eric Faurot <eric@openbsd.org> * * Permission to use, copy, modify, and distribute this software for any * purpose with or without fee is hereby granted, provided that the above @@ -39,6 +40,7 @@ #include "log.h" static void queue_imsg(struct imsgev *, struct imsg *); +static void queue_timeout(int, short, void *); static void queue_pass_to_scheduler(struct imsgev *, struct imsg *); static void queue_shutdown(void); static void queue_sig_handler(int, short, void *); @@ -224,7 +226,8 @@ queue(void) { pid_t pid; struct passwd *pw; - + struct timeval tv; + struct event ev_qload; struct event ev_sigint; struct event ev_sigterm; @@ -286,6 +289,12 @@ queue(void) config_pipes(peers, nitems(peers)); config_peers(peers, nitems(peers)); + /* setup queue loading task */ + evtimer_set(&ev_qload, queue_timeout, &ev_qload); + tv.tv_sec = 0; + tv.tv_usec = 10; + evtimer_add(&ev_qload, &tv); + if (event_dispatch() < 0) fatal("event_dispatch"); queue_shutdown(); @@ -294,6 +303,51 @@ queue(void) } void +queue_timeout(int fd, short event, void *p) +{ + static struct qwalk *q = NULL; + struct event *ev = p; + static uint64_t last_evpid = 0; + struct envelope envelope; + struct timeval tv; + uint64_t evpid; + + if (q == NULL) { + log_info("queue: loading queue into scheduler"); + q = qwalk_new(0); + } + + while (qwalk(q, &evpid)) { + if (! queue_envelope_load(evpid, &envelope)) + continue; + + if (evpid_to_msgid(evpid) != evpid_to_msgid(last_evpid) && + last_evpid != 0) { + envelope.id = last_evpid; + imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], + IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &envelope, + sizeof envelope); + } + + last_evpid = evpid; + tv.tv_sec = 0; + tv.tv_usec = 0; + evtimer_add(ev, &tv); + return; + } + + if (last_evpid) { + envelope.id = last_evpid; + imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], + IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &envelope, + sizeof envelope); + } + + log_info("queue: done loading queue into scheduler"); + qwalk_close(q); +} + +void queue_submit_envelope(struct envelope *ep) { imsg_compose_event(env->sc_ievs[PROC_QUEUE], |