summaryrefslogtreecommitdiff
path: root/usr.sbin/smtpd/queue.c
diff options
context:
space:
mode:
authorEric Faurot <eric@cvs.openbsd.org>2012-08-08 08:50:43 +0000
committerEric Faurot <eric@cvs.openbsd.org>2012-08-08 08:50:43 +0000
commit4fb538ff7fe692fc3494a906f82178b14bf48bf1 (patch)
tree74c2a8e0bb894841c747e6bee85be58c05869135 /usr.sbin/smtpd/queue.c
parente653821d1952b330839a2b474720109458d1d375 (diff)
Improve the scheduler backend API.
New envelopes are pushed into the scheduler through the insert() commit() rollback() transactional interface functions. Worklists are pulled from the scheduler through a single batch() interface function, which returns a list of envelope ids and the type of processing. Envelopes returned in this batch are said to be "in-flight", as opposed to "pending". They are supposed to be processed in some way, and either updated() or deleted() at some point. The schedule()/remove() functions are used to alter the internal state of "pending" envelopes to make them schedulable. The enve- lopes will be part of a worklist on the next call to batch(). Rewrite the scheduler_ramqueue backend. The initial queue loading in now done by the queue. ok gilles@
Diffstat (limited to 'usr.sbin/smtpd/queue.c')
-rw-r--r--usr.sbin/smtpd/queue.c58
1 files changed, 56 insertions, 2 deletions
diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c
index d0e9549607b..c8c2d31403c 100644
--- a/usr.sbin/smtpd/queue.c
+++ b/usr.sbin/smtpd/queue.c
@@ -1,8 +1,9 @@
-/* $OpenBSD: queue.c,v 1.121 2012/07/09 09:57:53 gilles Exp $ */
+/* $OpenBSD: queue.c,v 1.122 2012/08/08 08:50:42 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
* Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
+ * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
@@ -39,6 +40,7 @@
#include "log.h"
static void queue_imsg(struct imsgev *, struct imsg *);
+static void queue_timeout(int, short, void *);
static void queue_pass_to_scheduler(struct imsgev *, struct imsg *);
static void queue_shutdown(void);
static void queue_sig_handler(int, short, void *);
@@ -224,7 +226,8 @@ queue(void)
{
pid_t pid;
struct passwd *pw;
-
+ struct timeval tv;
+ struct event ev_qload;
struct event ev_sigint;
struct event ev_sigterm;
@@ -286,6 +289,12 @@ queue(void)
config_pipes(peers, nitems(peers));
config_peers(peers, nitems(peers));
+ /* setup queue loading task */
+ evtimer_set(&ev_qload, queue_timeout, &ev_qload);
+ tv.tv_sec = 0;
+ tv.tv_usec = 10;
+ evtimer_add(&ev_qload, &tv);
+
if (event_dispatch() < 0)
fatal("event_dispatch");
queue_shutdown();
@@ -294,6 +303,51 @@ queue(void)
}
void
+queue_timeout(int fd, short event, void *p)
+{
+ static struct qwalk *q = NULL;
+ struct event *ev = p;
+ static uint64_t last_evpid = 0;
+ struct envelope envelope;
+ struct timeval tv;
+ uint64_t evpid;
+
+ if (q == NULL) {
+ log_info("queue: loading queue into scheduler");
+ q = qwalk_new(0);
+ }
+
+ while (qwalk(q, &evpid)) {
+ if (! queue_envelope_load(evpid, &envelope))
+ continue;
+
+ if (evpid_to_msgid(evpid) != evpid_to_msgid(last_evpid) &&
+ last_evpid != 0) {
+ envelope.id = last_evpid;
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &envelope,
+ sizeof envelope);
+ }
+
+ last_evpid = evpid;
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+ evtimer_add(ev, &tv);
+ return;
+ }
+
+ if (last_evpid) {
+ envelope.id = last_evpid;
+ imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+ IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &envelope,
+ sizeof envelope);
+ }
+
+ log_info("queue: done loading queue into scheduler");
+ qwalk_close(q);
+}
+
+void
queue_submit_envelope(struct envelope *ep)
{
imsg_compose_event(env->sc_ievs[PROC_QUEUE],