diff options
author | Todd C. Miller <millert@cvs.openbsd.org> | 2007-02-04 13:17:36 +0000 |
---|---|---|
committer | Todd C. Miller <millert@cvs.openbsd.org> | 2007-02-04 13:17:36 +0000 |
commit | 711499ec7ced45d0f01061c4c8685e094afc9f5a (patch) | |
tree | 286d962cad334d6cb582057a4cbe535db6fe978f /gnu/usr.sbin/sendmail/libmilter/worker.c | |
parent | ec78776b871ba7b25c66902e80c5c4718d91b2cf (diff) |
Update to sendmail-8.14.0. OK mbalmer@
Diffstat (limited to 'gnu/usr.sbin/sendmail/libmilter/worker.c')
-rw-r--r-- | gnu/usr.sbin/sendmail/libmilter/worker.c | 792 |
1 files changed, 792 insertions, 0 deletions
diff --git a/gnu/usr.sbin/sendmail/libmilter/worker.c b/gnu/usr.sbin/sendmail/libmilter/worker.c new file mode 100644 index 00000000000..7ae3f315919 --- /dev/null +++ b/gnu/usr.sbin/sendmail/libmilter/worker.c @@ -0,0 +1,792 @@ +/* + * Copyright (c) 2003-2004, 2006 Sendmail, Inc. and its suppliers. + * All rights reserved. + * + * By using this file, you agree to the terms and conditions set + * forth in the LICENSE file which can be found at the top level of + * the sendmail distribution. + * + * Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris + * Jose-Marcio.Martins@ensmp.fr + */ + +#include <sm/gen.h> +SM_RCSID("@(#)$Sendmail: worker.c,v 8.9 2006/12/18 18:26:51 ca Exp $") + +#include "libmilter.h" + +#if _FFR_WORKERS_POOL + +typedef struct taskmgr_S taskmgr_T; + +#define TM_SIGNATURE 0x23021957 + +struct taskmgr_S +{ + long tm_signature; /* has the controller been initialized */ + sthread_t tm_tid; /* thread id of controller */ + smfi_hd_T tm_ctx_head; /* head of the linked list of contexts */ + + int tm_nb_workers; /* number of workers in the pool */ + int tm_nb_idle; /* number of workers waiting */ + + int tm_p[2]; /* poll control pipe */ + + smutex_t tm_w_mutex; /* linked list access mutex */ + scond_t tm_w_cond; /* */ +}; + +static taskmgr_T Tskmgr = {0}; + +#define WRK_CTX_HEAD Tskmgr.tm_ctx_head + +#define RD_PIPE (Tskmgr.tm_p[0]) +#define WR_PIPE (Tskmgr.tm_p[1]) + +#define PIPE_SEND_SIGNAL() \ + do \ + { \ + char evt = 0x5a; \ + int fd = WR_PIPE; \ + if (write(fd, &evt, sizeof(evt)) != sizeof(evt)) \ + smi_log(SMI_LOG_ERR, \ + "Error writing to event pipe: %s", \ + sm_errstring(errno)); \ + } while (0) + +#ifndef USE_PIPE_WAKE_POLL +# define USE_PIPE_WAKE_POLL 1 +#endif /* USE_PIPE_WAKE_POLL */ + +/* poll check periodicity (default 10000 - 10 s) */ +#define POLL_TIMEOUT 10000 + +/* worker conditional wait timeout (default 10 s) */ +#define COND_TIMEOUT 10 + +/* functions */ +static int mi_close_session __P((SMFICTX_PTR)); + +static void *mi_worker __P((void *)); +static void *mi_pool_controller __P((void *)); + +static int mi_list_add_ctx __P((SMFICTX_PTR)); +static int mi_list_del_ctx __P((SMFICTX_PTR)); + +/* +** periodicity of cleaning up old sessions (timedout) +** sessions list will be checked to find old inactive +** sessions each DT_CHECK_OLD_SESSIONS sec +*/ + +#define DT_CHECK_OLD_SESSIONS 600 + +#ifndef OLD_SESSION_TIMEOUT +# define OLD_SESSION_TIMEOUT ctx->ctx_timeout +#endif /* OLD_SESSION_TIMEOUT */ + +/* session states - with respect to the pool of workers */ +#define WKST_INIT 0 /* initial state */ +#define WKST_READY_TO_RUN 1 /* command ready do be read */ +#define WKST_RUNNING 2 /* session running on a worker */ +#define WKST_READY_TO_WAIT 3 /* session just finished by a worker */ +#define WKST_WAITING 4 /* waiting for new command */ +#define WKST_CLOSING 5 /* session finished */ + +#ifndef MIN_WORKERS +# define MIN_WORKERS 2 /* minimum number of threads to keep around */ +#endif + +#define MIN_IDLE 1 /* minimum number of idle threads */ + + +/* +** Macros for threads and mutex management +*/ + +#define TASKMGR_LOCK() \ + do \ + { \ + if (!smutex_lock(&Tskmgr.tm_w_mutex)) \ + smi_log(SMI_LOG_ERR, "TASKMGR_LOCK error"); \ + } while (0) + +#define TASKMGR_UNLOCK() \ + do \ + { \ + if (!smutex_unlock(&Tskmgr.tm_w_mutex)) \ + smi_log(SMI_LOG_ERR, "TASKMGR_UNLOCK error"); \ + } while (0) + +#define TASKMGR_COND_WAIT() \ + scond_timedwait(&Tskmgr.tm_w_cond, &Tskmgr.tm_w_mutex, COND_TIMEOUT) + +#define TASKMGR_COND_SIGNAL() \ + do \ + { \ + if (scond_signal(&Tskmgr.tm_w_cond) != 0) \ + smi_log(SMI_LOG_ERR, "TASKMGR_COND_SIGNAL error"); \ + } while (0) + +#define LAUNCH_WORKER(ctx) \ + do \ + { \ + int r; \ + sthread_t tid; \ + \ + if ((r = thread_create(&tid, mi_worker, ctx)) != 0) \ + smi_log(SMI_LOG_ERR, "LAUNCH_WORKER error: %s",\ + sm_errstring(r)); \ + } while (0) + +#if POOL_DEBUG +# define POOL_LEV_DPRINTF(lev, x) \ + do { \ + if ((lev) < ctx->ctx_dbg) \ + sm_dprintf x; \ + } while (0) +#else /* POOL_DEBUG */ +# define POOL_LEV_DPRINTF(lev, x) +#endif /* POOL_DEBUG */ + +/* +** MI_START_SESSION -- Start a session in the pool of workers +** +** Parameters: +** ctx -- context structure +** +** Returns: +** MI_SUCCESS/MI_FAILURE +*/ + +int +mi_start_session(ctx) + SMFICTX_PTR ctx; +{ + static long id = 0; + + SM_ASSERT(Tskmgr.tm_signature == TM_SIGNATURE); + SM_ASSERT(ctx != NULL); + POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE)); + TASKMGR_LOCK(); + + if (mi_list_add_ctx(ctx) != MI_SUCCESS) + { + TASKMGR_UNLOCK(); + return MI_FAILURE; + } + + ctx->ctx_sid = id++; + + /* if there is an idle worker, signal it, otherwise start new worker */ + if (Tskmgr.tm_nb_idle > 0) + { + ctx->ctx_wstate = WKST_READY_TO_RUN; + TASKMGR_COND_SIGNAL(); + } + else + { + ctx->ctx_wstate = WKST_RUNNING; + LAUNCH_WORKER(ctx); + } + TASKMGR_UNLOCK(); + return MI_SUCCESS; +} + +/* +** MI_CLOSE_SESSION -- Close a session and clean up data structures +** +** Parameters: +** ctx -- context structure +** +** Returns: +** MI_SUCCESS/MI_FAILURE +*/ + +static int +mi_close_session(ctx) + SMFICTX_PTR ctx; +{ + SM_ASSERT(ctx != NULL); + + (void) mi_list_del_ctx(ctx); + if (ValidSocket(ctx->ctx_sd)) + { + (void) closesocket(ctx->ctx_sd); + ctx->ctx_sd = INVALID_SOCKET; + } + if (ctx->ctx_reply != NULL) + { + free(ctx->ctx_reply); + ctx->ctx_reply = NULL; + } + if (ctx->ctx_privdata != NULL) + { + smi_log(SMI_LOG_WARN, "%s: private data not NULL", + ctx->ctx_smfi->xxfi_name); + } + mi_clr_macros(ctx, 0); + free(ctx); + + return MI_SUCCESS; +} + +/* +** MI_POOL_CONTROLER_INIT -- Launch the worker pool controller +** Must be called before starting sessions. +** +** Parameters: +** none +** +** Returns: +** MI_SUCCESS/MI_FAILURE +*/ + +int +mi_pool_controller_init() +{ + sthread_t tid; + int r, i; + + if (Tskmgr.tm_signature == TM_SIGNATURE) + return MI_SUCCESS; + + SM_TAILQ_INIT(&WRK_CTX_HEAD); + Tskmgr.tm_tid = (sthread_t) -1; + Tskmgr.tm_nb_workers = 0; + Tskmgr.tm_nb_idle = 0; + + if (pipe(Tskmgr.tm_p) != 0) + { + smi_log(SMI_LOG_ERR, "can't create event pipe: %s", + sm_errstring(r)); + return MI_FAILURE; + } + + POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE)); + + (void) smutex_init(&Tskmgr.tm_w_mutex); + (void) scond_init(&Tskmgr.tm_w_cond); + + /* Launch the pool controller */ + if ((r = thread_create(&tid, mi_pool_controller, (void *) NULL)) != 0) + { + smi_log(SMI_LOG_ERR, "can't create controller thread: %s", + sm_errstring(r)); + return MI_FAILURE; + } + Tskmgr.tm_tid = tid; + Tskmgr.tm_signature = TM_SIGNATURE; + + /* Create the pool of workers */ + for (i = 0; i < MIN_WORKERS; i++) + { + if ((r = thread_create(&tid, mi_worker, (void *) NULL)) != 0) + { + smi_log(SMI_LOG_ERR, "can't create workers crew: %s", + sm_errstring(r)); + return MI_FAILURE; + } + } + + return MI_SUCCESS; +} + +/* +** MI_POOL_CONTROLLER -- manage the pool of workers +** This thread must be running when listener begins +** starting sessions +** +** Parameters: +** arg -- unused +** +** Returns: +** NULL +** +** Control flow: +** for (;;) +** Look for timed out sessions +** Select sessions to wait for sendmail command +** Poll set of file descriptors +** if timeout +** continue +** For each file descriptor ready +** launch new thread if no worker available +** else +** signal waiting worker +*/ + +/* Poll structure array (pollfd) size step */ +#define PFD_STEP 256 + +#define WAIT_FD(i) (pfd[i].fd) +#define WAITFN "POLL" + +static void * +mi_pool_controller(arg) + void *arg; +{ + struct pollfd *pfd = NULL; + int dim_pfd = 0; + bool rebuild_set = true; + int pcnt = 0; /* error count for poll() failures */ + + Tskmgr.tm_tid = sthread_get_id(); + if (pthread_detach(Tskmgr.tm_tid) != 0) + { + smi_log(SMI_LOG_ERR, "Failed to detach pool controller thread"); + return NULL; + } + + pfd = (struct pollfd *) malloc(PFD_STEP * sizeof(struct pollfd)); + if (pfd == NULL) + { + smi_log(SMI_LOG_ERR, "Failed to malloc pollfd array: %s", + sm_errstring(errno)); + return NULL; + } + dim_pfd = PFD_STEP; + + for (;;) + { + SMFICTX_PTR ctx; + int nfd, rfd, i; + time_t now; + time_t lastcheck; + + POOL_LEV_DPRINTF(4, ("Let's %s again...", WAITFN)); + + if (mi_stop() != MILTER_CONT) + break; + + TASKMGR_LOCK(); + + now = time(NULL); + + /* check for timed out sessions? */ + if (lastcheck + DT_CHECK_OLD_SESSIONS < now) + { + SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) + { + if (ctx->ctx_wstate == WKST_WAITING) + { + if (ctx->ctx_wait == 0) + { + ctx->ctx_wait = now; + continue; + } + + /* if session timed out, close it */ + if (ctx->ctx_wait + OLD_SESSION_TIMEOUT + < now) + { + sfsistat (*fi_close) __P((SMFICTX *)); + + POOL_LEV_DPRINTF(4, + ("Closing old connection: sd=%d id=%d", + ctx->ctx_sd, + ctx->ctx_sid)); + + if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL) + (void) (*fi_close)(ctx); + + mi_close_session(ctx); + ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD); + continue; + } + } + } + lastcheck = now; + } + + if (rebuild_set) + { + /* + ** Initialize poll set. + ** Insert into the poll set the file descriptors of + ** all sessions waiting for a command from sendmail. + */ + + nfd = 0; + + /* begin with worker pipe */ + pfd[nfd].fd = RD_PIPE; + pfd[nfd].events = MI_POLL_RD_FLAGS; + pfd[nfd].revents = 0; + nfd++; + + SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) + { + /* + ** update ctx_wait - start of wait moment - + ** for timeout + */ + + if (ctx->ctx_wstate == WKST_READY_TO_WAIT) + ctx->ctx_wait = now; + + /* add the session to the pollfd array? */ + if ((ctx->ctx_wstate == WKST_READY_TO_WAIT) || + (ctx->ctx_wstate == WKST_WAITING)) + { + /* + ** Resize the pollfd array if it + ** isn't large enough. + */ + + if (nfd >= dim_pfd) + { + struct pollfd *tpfd; + size_t new; + + new = (dim_pfd + PFD_STEP) * + sizeof(*tpfd); + tpfd = (struct pollfd *) + realloc(pfd, new); + if (tpfd != NULL) + { + pfd = tpfd; + dim_pfd += PFD_STEP; + } + else + { + smi_log(SMI_LOG_ERR, + "Failed to realloc pollfd array:%s", + sm_errstring(errno)); + } + } + + /* add the session to pollfd array */ + if (nfd < dim_pfd) + { + ctx->ctx_wstate = WKST_WAITING; + pfd[nfd].fd = ctx->ctx_sd; + pfd[nfd].events = MI_POLL_RD_FLAGS; + pfd[nfd].revents = 0; + nfd++; + } + } + } + } + + TASKMGR_UNLOCK(); + + /* Everything is ready, let's wait for an event */ + rfd = poll(pfd, nfd, POLL_TIMEOUT); + + POOL_LEV_DPRINTF(4, ("%s returned: at epoch %d value %d", + WAITFN, now, nfd)); + + /* timeout */ + if (rfd == 0) + continue; + + rebuild_set = true; + + /* error */ + if (rfd < 0) + { + if (errno == EINTR) + continue; + pcnt++; + smi_log(SMI_LOG_ERR, + "%s() failed (%s), %s", + WAITFN, sm_errstring(errno), + pcnt >= MAX_FAILS_S ? "abort" : "try again"); + + if (pcnt >= MAX_FAILS_S) + goto err; + } + pcnt = 0; + + /* something happened */ + for (i = 0; i < nfd; i++) + { + if (pfd[i].revents == 0) + continue; + + POOL_LEV_DPRINTF(4, ("%s event on pfd[%d/%d]=%d ", + WAITFN, i, nfd, + WAIT_FD(i))); + + /* has a worker signaled an end of task ? */ + if (WAIT_FD(i) == RD_PIPE) + { + char evt = 0; + int r = 0; + + POOL_LEV_DPRINTF(4, + ("PIPE WILL READ evt = %08X %08X", + pfd[i].events, pfd[i].revents)); + + if ((pfd[i].revents & MI_POLL_RD_FLAGS) != 0) + { + r = read(RD_PIPE, &evt, sizeof(evt)); + if (r == sizeof(evt)) + { + /* Do nothing */ + } + } + + POOL_LEV_DPRINTF(4, + ("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]", + i, RD_PIPE, r, evt)); + + if ((pfd[i].revents & ~MI_POLL_RD_FLAGS) != 0) + { + /* Exception handling */ + } + continue; + } + + /* no ! sendmail wants to send a command */ + SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) + { + if (ctx->ctx_wstate != WKST_WAITING) + continue; + + POOL_LEV_DPRINTF(4, + ("Checking context sd=%d - fd=%d ", + ctx->ctx_sd , WAIT_FD(i))); + + if (ctx->ctx_sd == pfd[i].fd) + { + TASKMGR_LOCK(); + + POOL_LEV_DPRINTF(4, + ("TASK: found %d for fd[%d]=%d", + ctx->ctx_sid, i, WAIT_FD(i))); + + if (Tskmgr.tm_nb_idle > 0) + { + ctx->ctx_wstate = WKST_READY_TO_RUN; + TASKMGR_COND_SIGNAL(); + } + else + { + ctx->ctx_wstate = WKST_RUNNING; + LAUNCH_WORKER(ctx); + } + TASKMGR_UNLOCK(); + break; + } + } + + POOL_LEV_DPRINTF(4, + ("TASK %s FOUND - Checking PIPE for fd[%d]", + ctx != NULL ? "" : "NOT", WAIT_FD(i))); + } + } + + err: + if (pfd != NULL) + free(pfd); + + Tskmgr.tm_signature = 0; + for (;;) + { + SMFICTX_PTR ctx; + + ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD); + if (ctx == NULL) + break; + mi_close_session(ctx); + } + + (void) smutex_destroy(&Tskmgr.tm_w_mutex); + (void) scond_destroy(&Tskmgr.tm_w_cond); + + return NULL; +} + +/* +** Look for a task ready to run. +** Value of ctx is NULL or a pointer to a task ready to run. +*/ + +#define GET_TASK_READY_TO_RUN() \ + SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) \ + { \ + if (ctx->ctx_wstate == WKST_READY_TO_RUN) \ + { \ + ctx->ctx_wstate = WKST_RUNNING; \ + break; \ + } \ + } + +/* +** MI_WORKER -- worker thread +** executes tasks distributed by the mi_pool_controller +** or by mi_start_session +** +** Parameters: +** arg -- pointer to context structure +** +** Returns: +** NULL pointer +*/ + +static void * +mi_worker(arg) + void *arg; +{ + SMFICTX_PTR ctx; + bool done; + sthread_t t_id; + int r; + + ctx = (SMFICTX_PTR) arg; + done = false; + if (ctx != NULL) + ctx->ctx_wstate = WKST_RUNNING; + + t_id = sthread_get_id(); + if (pthread_detach(t_id) != 0) + { + smi_log(SMI_LOG_ERR, "Failed to detach worker thread"); + if (ctx != NULL) + ctx->ctx_wstate = WKST_READY_TO_RUN; + return NULL; + } + + TASKMGR_LOCK(); + Tskmgr.tm_nb_workers++; + TASKMGR_UNLOCK(); + + while (!done) + { + if (mi_stop() != MILTER_CONT) + break; + + /* let's handle next task... */ + if (ctx != NULL) + { + int res; + + POOL_LEV_DPRINTF(4, + ("worker %d: new task -> let's handle it", + t_id)); + res = mi_engine(ctx); + POOL_LEV_DPRINTF(4, + ("worker %d: mi_engine returned %d", t_id, res)); + + TASKMGR_LOCK(); + if (res != MI_CONTINUE) + { + ctx->ctx_wstate = WKST_CLOSING; + + /* + ** Delete context from linked list of + ** sessions and close session. + */ + + mi_close_session(ctx); + } + else + { + ctx->ctx_wstate = WKST_READY_TO_WAIT; + + POOL_LEV_DPRINTF(4, + ("writing to event pipe...")); + + /* + ** Signal task controller to add new session + ** to poll set. + */ + + PIPE_SEND_SIGNAL(); + } + TASKMGR_UNLOCK(); + ctx = NULL; + + } + + /* check if there is any task waiting to be served */ + TASKMGR_LOCK(); + + GET_TASK_READY_TO_RUN(); + + /* Got a task? */ + if (ctx != NULL) + { + TASKMGR_UNLOCK(); + continue; + } + + /* + ** if not, let's check if there is enough idle workers + ** if yes: quit + */ + + if (Tskmgr.tm_nb_workers > MIN_WORKERS && + Tskmgr.tm_nb_idle > MIN_IDLE) + done = true; + + POOL_LEV_DPRINTF(4, ("worker %d: checking ... %d %d", t_id, + Tskmgr.tm_nb_workers, Tskmgr.tm_nb_idle + 1)); + + if (done) + { + POOL_LEV_DPRINTF(4, ("worker %d: quitting... ", t_id)); + Tskmgr.tm_nb_workers--; + TASKMGR_UNLOCK(); + continue; + } + + /* + ** if no task ready to run, wait for another one + */ + + Tskmgr.tm_nb_idle++; + TASKMGR_COND_WAIT(); + Tskmgr.tm_nb_idle--; + + /* look for a task */ + GET_TASK_READY_TO_RUN(); + + TASKMGR_UNLOCK(); + } + return NULL; +} + +/* +** MI_LIST_ADD_CTX -- add new session to linked list +** +** Parameters: +** ctx -- context structure +** +** Returns: +** MI_FAILURE/MI_SUCCESS +*/ + +static int +mi_list_add_ctx(ctx) + SMFICTX_PTR ctx; +{ + SM_ASSERT(ctx != NULL); + SM_TAILQ_INSERT_TAIL(&WRK_CTX_HEAD, ctx, ctx_link); + return MI_SUCCESS; +} + +/* +** MI_LIST_DEL_CTX -- remove session from linked list when finished +** +** Parameters: +** ctx -- context structure +** +** Returns: +** MI_FAILURE/MI_SUCCESS +*/ + +static int +mi_list_del_ctx(ctx) + SMFICTX_PTR ctx; +{ + SM_ASSERT(ctx != NULL); + if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD)) + return MI_FAILURE; + + SM_TAILQ_REMOVE(&WRK_CTX_HEAD, ctx, ctx_link); + return MI_SUCCESS; +} +#endif /* _FFR_WORKERS_POOL */ |