diff options
Diffstat (limited to 'usr.sbin/smtpd/runner.c')
-rw-r--r-- | usr.sbin/smtpd/runner.c | 889 |
1 files changed, 889 insertions, 0 deletions
diff --git a/usr.sbin/smtpd/runner.c b/usr.sbin/smtpd/runner.c new file mode 100644 index 00000000000..9c2c4579c32 --- /dev/null +++ b/usr.sbin/smtpd/runner.c @@ -0,0 +1,889 @@ +/* $OpenBSD: runner.c,v 1.89 2010/06/01 19:47:09 jacekm Exp $ */ + +/* + * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> + * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> + * Copyright (c) 2008-2009 Jacek Masiulaniec <jacekm@dobremiasto.net> + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include <sys/types.h> +#include <sys/queue.h> +#include <sys/tree.h> +#include <sys/param.h> +#include <sys/socket.h> +#include <sys/stat.h> + +#include <netinet/in.h> +#include <arpa/inet.h> + +#include <ctype.h> +#include <dirent.h> +#include <err.h> +#include <errno.h> +#include <event.h> +#include <fcntl.h> +#include <libgen.h> +#include <netdb.h> +#include <pwd.h> +#include <signal.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <time.h> +#include <unistd.h> + +#include "smtpd.h" + +void runner_imsg(struct smtpd *, struct imsgev *, struct imsg *); + +__dead void runner_shutdown(void); +void runner_sig_handler(int, short, void *); +void runner_setup_events(struct smtpd *); +void runner_disable_events(struct smtpd *); + +void runner_reset_flags(void); +void runner_process_offline(struct smtpd *); + +void runner_timeout(int, short, void *); + +void runner_process_queue(struct smtpd *); +void runner_process_runqueue(struct smtpd *); +void runner_process_batchqueue(struct smtpd *); + +int runner_message_schedule(struct message *, time_t); + +void runner_purge_run(void); +void runner_purge_message(char *); + +int runner_check_loop(struct message *); + +struct batch *batch_record(struct smtpd *, struct message *); +struct batch *batch_lookup(struct smtpd *, struct message *); + +int runner_force_envelope_schedule(char *); +int runner_force_message_schedule(char *); + +int runner_force_envelope_remove(char *); +int runner_force_message_remove(char *); + +void +runner_imsg(struct smtpd *env, struct imsgev *iev, struct imsg *imsg) +{ + struct message *m; + struct remove *rem; + struct sched *sched; + + switch (imsg->hdr.type) { + case IMSG_QUEUE_MESSAGE_UPDATE: + env->stats->runner.active--; + queue_message_update(imsg->data); + return; + + case IMSG_MDA_SESS_NEW: + env->stats->mda.sessions_active--; + return; + + case IMSG_BATCH_DONE: + env->stats->mta.sessions_active--; + return; + + case IMSG_QUEUE_SCHEDULE: + sched = imsg->data; + sched->ret = 0; + if (valid_message_uid(sched->mid)) + sched->ret = runner_force_envelope_schedule(sched->mid); + else if (valid_message_id(sched->mid)) + sched->ret = runner_force_message_schedule(sched->mid); + imsg_compose_event(iev, IMSG_QUEUE_SCHEDULE, imsg->hdr.peerid, + 0, -1, sched, sizeof *sched); + return; + + case IMSG_QUEUE_REMOVE: + rem = imsg->data; + rem->ret = 0; + if (valid_message_uid(rem->mid)) + rem->ret = runner_force_envelope_remove(rem->mid); + else if (valid_message_id(rem->mid)) + rem->ret = runner_force_message_remove(rem->mid); + imsg_compose_event(iev, IMSG_QUEUE_REMOVE, imsg->hdr.peerid, 0, + -1, rem, sizeof *rem); + return; + + case IMSG_PARENT_ENQUEUE_OFFLINE: + runner_process_offline(env); + return; + + case IMSG_SMTP_ENQUEUE: + m = imsg->data; + if (imsg->fd < 0 || !bounce_session(env, imsg->fd, m)) { + m->status = S_MESSAGE_TEMPFAILURE; + queue_message_update(m); + } + return; + + case IMSG_QUEUE_PAUSE_LOCAL: + env->sc_opts |= SMTPD_MDA_PAUSED; + return; + + case IMSG_QUEUE_RESUME_LOCAL: + env->sc_opts &= ~SMTPD_MDA_PAUSED; + return; + + case IMSG_QUEUE_PAUSE_OUTGOING: + env->sc_opts |= SMTPD_MTA_PAUSED; + return; + + case IMSG_QUEUE_RESUME_OUTGOING: + env->sc_opts &= ~SMTPD_MTA_PAUSED; + return; + + case IMSG_CTL_VERBOSE: + log_verbose(*(int *)imsg->data); + return; + } + + fatalx("runner_imsg: unexpected imsg"); +} + +void +runner_sig_handler(int sig, short event, void *p) +{ + switch (sig) { + case SIGINT: + case SIGTERM: + runner_shutdown(); + break; + default: + fatalx("runner_sig_handler: unexpected signal"); + } +} + +void +runner_shutdown(void) +{ + log_info("runner handler exiting"); + _exit(0); +} + +void +runner_setup_events(struct smtpd *env) +{ + struct timeval tv; + + evtimer_set(&env->sc_ev, runner_timeout, env); + tv.tv_sec = 0; + tv.tv_usec = 10; + evtimer_add(&env->sc_ev, &tv); +} + +void +runner_disable_events(struct smtpd *env) +{ + evtimer_del(&env->sc_ev); +} + +pid_t +runner(struct smtpd *env) +{ + pid_t pid; + struct passwd *pw; + + struct event ev_sigint; + struct event ev_sigterm; + + struct peer peers[] = { + { PROC_QUEUE, imsg_dispatch } + }; + + switch (pid = fork()) { + case -1: + fatal("runner: cannot fork"); + case 0: + break; + default: + return (pid); + } + + purge_config(env, PURGE_EVERYTHING); + + pw = env->sc_pw; + + if (chroot(PATH_SPOOL) == -1) + fatal("runner: chroot"); + if (chdir("/") == -1) + fatal("runner: chdir(\"/\")"); + + smtpd_process = PROC_RUNNER; + setproctitle("%s", env->sc_title[smtpd_process]); + + if (setgroups(1, &pw->pw_gid) || + setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) || + setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid)) + fatal("runner: cannot drop privileges"); + + SPLAY_INIT(&env->batch_queue); + + imsg_callback = runner_imsg; + event_init(); + + signal_set(&ev_sigint, SIGINT, runner_sig_handler, env); + signal_set(&ev_sigterm, SIGTERM, runner_sig_handler, env); + signal_add(&ev_sigint, NULL); + signal_add(&ev_sigterm, NULL); + signal(SIGPIPE, SIG_IGN); + signal(SIGHUP, SIG_IGN); + + /* see fdlimit()-related comment in queue.c */ + fdlimit(1.0); + if ((env->sc_maxconn = availdesc() / 4) < 1) + fatalx("runner: fd starvation"); + + config_pipes(env, peers, nitems(peers)); + config_peers(env, peers, nitems(peers)); + + unlink(PATH_QUEUE "/envelope.tmp"); + runner_reset_flags(); + runner_process_offline(env); + + runner_setup_events(env); + event_dispatch(); + runner_shutdown(); + + return (0); +} + +void +runner_process_offline(struct smtpd *env) +{ + char path[MAXPATHLEN]; + struct qwalk *q; + + q = qwalk_new(PATH_OFFLINE); + + if (qwalk(q, path)) + imsg_compose_event(env->sc_ievs[PROC_QUEUE], + IMSG_PARENT_ENQUEUE_OFFLINE, PROC_PARENT, 0, -1, path, + strlen(path) + 1); + + qwalk_close(q); +} + +void +runner_reset_flags(void) +{ + char path[MAXPATHLEN]; + struct message message; + struct qwalk *q; + + q = qwalk_new(PATH_QUEUE); + + while (qwalk(q, path)) { + while (! queue_load_envelope(&message, basename(path))) + sleep(1); + message_reset_flags(&message); + } + + qwalk_close(q); +} + +void +runner_timeout(int fd, short event, void *p) +{ + struct smtpd *env = p; + struct timeval tv; + + runner_purge_run(); + + runner_process_queue(env); + runner_process_runqueue(env); + runner_process_batchqueue(env); + + tv.tv_sec = 1; + tv.tv_usec = 0; + evtimer_add(&env->sc_ev, &tv); +} + +void +runner_process_queue(struct smtpd *env) +{ + char path[MAXPATHLEN]; + char rqpath[MAXPATHLEN]; + struct message message; + time_t now; + size_t mta_av, mda_av, bnc_av; + struct qwalk *q; + + mta_av = env->sc_maxconn - env->stats->mta.sessions_active; + mda_av = env->sc_maxconn - env->stats->mda.sessions_active; + bnc_av = env->sc_maxconn - env->stats->runner.bounces_active; + + now = time(NULL); + q = qwalk_new(PATH_QUEUE); + + while (qwalk(q, path)) { + if (! queue_load_envelope(&message, basename(path))) + continue; + + if (message.type & T_MDA_MESSAGE) { + if (env->sc_opts & SMTPD_MDA_PAUSED) + continue; + if (mda_av == 0) + continue; + } + + if (message.type & T_MTA_MESSAGE) { + if (env->sc_opts & SMTPD_MTA_PAUSED) + continue; + if (mta_av == 0) + continue; + } + + if (message.type & T_BOUNCE_MESSAGE) { + if (env->sc_opts & (SMTPD_MDA_PAUSED|SMTPD_MTA_PAUSED)) + continue; + if (bnc_av == 0) + continue; + } + + if (! runner_message_schedule(&message, now)) + continue; + + if (runner_check_loop(&message)) { + message_set_errormsg(&message, "loop has been detected"); + bounce_record_message(&message); + queue_remove_envelope(&message); + continue; + } + + message.flags |= F_MESSAGE_SCHEDULED; + message.flags &= ~F_MESSAGE_FORCESCHEDULE; + queue_update_envelope(&message); + + if (! bsnprintf(rqpath, sizeof(rqpath), "%s/%s", PATH_RUNQUEUE, + basename(path))) + fatalx("runner_process_queue: snprintf"); + + if (symlink(path, rqpath) == -1) { + if (errno == EEXIST) + continue; + if (errno == ENOSPC) + break; + fatal("runner_process_queue: symlink"); + } + + if (message.type & T_MDA_MESSAGE) + mda_av--; + if (message.type & T_MTA_MESSAGE) + mta_av--; + if (message.type & T_BOUNCE_MESSAGE) + bnc_av--; + } + + qwalk_close(q); +} + +void +runner_process_runqueue(struct smtpd *env) +{ + char path[MAXPATHLEN]; + struct message message; + time_t tm; + struct batch *batchp; + struct message *messagep; + struct qwalk *q; + + tm = time(NULL); + + q = qwalk_new(PATH_RUNQUEUE); + + while (qwalk(q, path)) { + unlink(path); + + if (! queue_load_envelope(&message, basename(path))) + continue; + + if (message.flags & F_MESSAGE_PROCESSING) + continue; + + message.lasttry = tm; + message.flags &= ~F_MESSAGE_SCHEDULED; + message.flags |= F_MESSAGE_PROCESSING; + + if (! queue_update_envelope(&message)) + continue; + + messagep = calloc(1, sizeof (struct message)); + if (messagep == NULL) + fatal("runner_process_runqueue: calloc"); + *messagep = message; + + messagep->batch_id = 0; + batchp = batch_lookup(env, messagep); + if (batchp != NULL) + messagep->batch_id = batchp->id; + + batchp = batch_record(env, messagep); + if (messagep->batch_id == 0) + messagep->batch_id = batchp->id; + } + + qwalk_close(q); +} + +void +runner_process_batchqueue(struct smtpd *env) +{ + struct batch *batchp; + struct message *m; + int fd; + + while ((batchp = SPLAY_MIN(batchtree, &env->batch_queue)) != NULL) { + switch (batchp->type) { + case T_BOUNCE_BATCH: + while ((m = TAILQ_FIRST(&batchp->messages))) { + imsg_compose_event(env->sc_ievs[PROC_QUEUE], + IMSG_SMTP_ENQUEUE, PROC_SMTP, 0, -1, m, + sizeof *m); + TAILQ_REMOVE(&batchp->messages, m, entry); + free(m); + } + env->stats->runner.bounces_active++; + env->stats->runner.bounces++; + break; + + case T_MDA_BATCH: + m = TAILQ_FIRST(&batchp->messages); + fd = queue_open_message_file(m->message_id); + imsg_compose_event(env->sc_ievs[PROC_QUEUE], + IMSG_MDA_SESS_NEW, PROC_MDA, 0, fd, m, + sizeof *m); + TAILQ_REMOVE(&batchp->messages, m, entry); + free(m); + env->stats->mda.sessions_active++; + env->stats->mda.sessions++; + break; + + case T_MTA_BATCH: + imsg_compose_event(env->sc_ievs[PROC_QUEUE], + IMSG_BATCH_CREATE, PROC_MTA, 0, -1, batchp, + sizeof *batchp); + while ((m = TAILQ_FIRST(&batchp->messages))) { + imsg_compose_event(env->sc_ievs[PROC_QUEUE], + IMSG_BATCH_APPEND, PROC_MTA, 0, -1, m, + sizeof *m); + TAILQ_REMOVE(&batchp->messages, m, entry); + free(m); + } + imsg_compose_event(env->sc_ievs[PROC_QUEUE], + IMSG_BATCH_CLOSE, PROC_MTA, 0, -1, batchp, + sizeof *batchp); + env->stats->mta.sessions_active++; + env->stats->mta.sessions++; + break; + + default: + fatalx("runner_process_batchqueue: unknown type"); + } + + SPLAY_REMOVE(batchtree, &env->batch_queue, batchp); + free(batchp); + } +} + +int +runner_message_schedule(struct message *messagep, time_t tm) +{ + time_t delay; + + if (messagep->flags & (F_MESSAGE_SCHEDULED|F_MESSAGE_PROCESSING)) + return 0; + + if (messagep->flags & F_MESSAGE_FORCESCHEDULE) + return 1; + + /* Batch has been in the queue for too long and expired */ + if (tm - messagep->creation >= SMTPD_QUEUE_EXPIRY) { + message_set_errormsg(messagep, "message expired after sitting in queue for %d days", + SMTPD_QUEUE_EXPIRY / 60 / 60 / 24); + bounce_record_message(messagep); + queue_remove_envelope(messagep); + return 0; + } + + if (messagep->lasttry == 0) + return 1; + + delay = SMTPD_QUEUE_MAXINTERVAL; + + // recompute path + + if (messagep->type == T_MDA_MESSAGE || + messagep->type == T_BOUNCE_MESSAGE) { + if (messagep->retry < 5) + return 1; + + if (messagep->retry < 15) + delay = (messagep->retry * 60) + arc4random_uniform(60); + } + + if (messagep->type == T_MTA_MESSAGE) { + if (messagep->retry < 3) + delay = SMTPD_QUEUE_INTERVAL; + else if (messagep->retry <= 7) { + delay = SMTPD_QUEUE_INTERVAL * (1 << (messagep->retry - 3)); + if (delay > SMTPD_QUEUE_MAXINTERVAL) + delay = SMTPD_QUEUE_MAXINTERVAL; + } + } + + if (tm >= messagep->lasttry + delay) + return 1; + + return 0; +} + +int +runner_force_envelope_schedule(char *mid) +{ + struct message message; + + if (! queue_load_envelope(&message, mid)) + return 0; + + if (message.flags & (F_MESSAGE_PROCESSING|F_MESSAGE_SCHEDULED)) + return 1; + + message.flags |= F_MESSAGE_FORCESCHEDULE; + + if (! queue_update_envelope(&message)) + return 0; + + return 1; +} + +int +runner_force_message_schedule(char *mid) +{ + char path[MAXPATHLEN]; + DIR *dirp; + struct dirent *dp; + + if (! bsnprintf(path, MAXPATHLEN, "%s/%d/%s/envelopes", + PATH_QUEUE, queue_hash(mid), mid)) + return 0; + + dirp = opendir(path); + if (dirp == NULL) + return 0; + + while ((dp = readdir(dirp)) != NULL) { + if (valid_message_uid(dp->d_name)) + runner_force_envelope_schedule(dp->d_name); + } + closedir(dirp); + + return 1; +} + + +int +runner_force_envelope_remove(char *mid) +{ + struct message message; + + if (! queue_load_envelope(&message, mid)) + return 0; + + if (message.flags & (F_MESSAGE_PROCESSING|F_MESSAGE_SCHEDULED)) + return 0; + + if (! queue_remove_envelope(&message)) + return 0; + + return 1; +} + +int +runner_force_message_remove(char *mid) +{ + char path[MAXPATHLEN]; + DIR *dirp; + struct dirent *dp; + + if (! bsnprintf(path, MAXPATHLEN, "%s/%d/%s/envelopes", + PATH_QUEUE, queue_hash(mid), mid)) + return 0; + + dirp = opendir(path); + if (dirp == NULL) + return 0; + + while ((dp = readdir(dirp)) != NULL) { + if (valid_message_uid(dp->d_name)) + runner_force_envelope_remove(dp->d_name); + } + closedir(dirp); + + return 1; +} + +void +runner_purge_run(void) +{ + char path[MAXPATHLEN]; + struct qwalk *q; + + q = qwalk_new(PATH_PURGE); + + while (qwalk(q, path)) + runner_purge_message(basename(path)); + + qwalk_close(q); +} + +void +runner_purge_message(char *msgid) +{ + char rootdir[MAXPATHLEN]; + char evpdir[MAXPATHLEN]; + char evppath[MAXPATHLEN]; + char msgpath[MAXPATHLEN]; + DIR *dirp; + struct dirent *dp; + + if (! bsnprintf(rootdir, sizeof(rootdir), "%s/%s", PATH_PURGE, msgid)) + fatal("runner_purge_message: snprintf"); + + if (! bsnprintf(evpdir, sizeof(evpdir), "%s%s", rootdir, + PATH_ENVELOPES)) + fatal("runner_purge_message: snprintf"); + + if (! bsnprintf(msgpath, sizeof(msgpath), "%s/message", rootdir)) + fatal("runner_purge_message: snprintf"); + + if (unlink(msgpath) == -1) + if (errno != ENOENT) + fatal("runner_purge_message: unlink"); + + dirp = opendir(evpdir); + if (dirp == NULL) { + if (errno == ENOENT) + goto delroot; + fatal("runner_purge_message: opendir"); + } + while ((dp = readdir(dirp)) != NULL) { + if (strcmp(dp->d_name, ".") == 0 || + strcmp(dp->d_name, "..") == 0) + continue; + if (! bsnprintf(evppath, sizeof(evppath), "%s/%s", evpdir, + dp->d_name)) + fatal("runner_purge_message: snprintf"); + + if (unlink(evppath) == -1) + if (errno != ENOENT) + fatal("runner_purge_message: unlink"); + } + closedir(dirp); + + if (rmdir(evpdir) == -1) + if (errno != ENOENT) + fatal("runner_purge_message: rmdir"); + +delroot: + if (rmdir(rootdir) == -1) + if (errno != ENOENT) + fatal("runner_purge_message: rmdir"); +} + +struct batch * +batch_record(struct smtpd *env, struct message *messagep) +{ + struct batch *batchp; + struct path *path; + + batchp = NULL; + if (messagep->batch_id != 0) { + batchp = batch_by_id(env, messagep->batch_id); + if (batchp == NULL) + fatalx("batch_record: internal inconsistency."); + } + if (batchp == NULL) { + batchp = calloc(1, sizeof(struct batch)); + if (batchp == NULL) + fatal("batch_record: calloc"); + + batchp->id = generate_uid(); + + (void)strlcpy(batchp->message_id, messagep->message_id, + sizeof(batchp->message_id)); + TAILQ_INIT(&batchp->messages); + SPLAY_INSERT(batchtree, &env->batch_queue, batchp); + + if (messagep->type & T_BOUNCE_MESSAGE) { + batchp->type = T_BOUNCE_BATCH; + path = &messagep->sender; + } + else { + path = &messagep->recipient; + } + batchp->rule = path->rule; + + (void)strlcpy(batchp->hostname, path->domain, + sizeof(batchp->hostname)); + + if (batchp->type != T_BOUNCE_BATCH) { + if (IS_MAILBOX(*path) || IS_EXT(*path)) { + batchp->type = T_MDA_BATCH; + } + else { + batchp->type = T_MTA_BATCH; + } + } + } + + TAILQ_INSERT_TAIL(&batchp->messages, messagep, entry); + env->stats->runner.active++; + return batchp; +} + +struct batch * +batch_lookup(struct smtpd *env, struct message *message) +{ + struct batch *batchp; + struct batch lookup; + + /* We only support delivery of one message at a time, in MDA + * and bounces messages. + */ + if (message->type == T_BOUNCE_MESSAGE || message->type == T_MDA_MESSAGE) + return NULL; + + /* If message->batch_id != 0, we can retrieve batch by id */ + if (message->batch_id != 0) { + lookup.id = message->batch_id; + return SPLAY_FIND(batchtree, &env->batch_queue, &lookup); + } + + /* We do not know the batch_id yet, maybe it was created but we could not + * be notified, or it just does not exist. Let's scan to see if we can do + * a match based on our message_id and flags. + */ + SPLAY_FOREACH(batchp, batchtree, &env->batch_queue) { + + if (batchp->type != message->type) + continue; + + if (strcasecmp(batchp->message_id, message->message_id) != 0) + continue; + + if (batchp->type & T_MTA_BATCH) + if (strcasecmp(batchp->hostname, message->recipient.domain) != 0) + continue; + + break; + } + + return batchp; +} + +int +batch_cmp(struct batch *s1, struct batch *s2) +{ + /* + * do not return u_int64_t's + */ + if (s1->id < s2->id) + return (-1); + + if (s1->id > s2->id) + return (1); + + return (0); +} + +int +runner_check_loop(struct message *messagep) +{ + int fd; + FILE *fp; + char *buf, *lbuf; + size_t len; + struct path chkpath; + int ret = 0; + int rcvcount = 0; + + fd = queue_open_message_file(messagep->message_id); + if ((fp = fdopen(fd, "r")) == NULL) + fatal("fdopen"); + + lbuf = NULL; + while ((buf = fgetln(fp, &len))) { + if (buf[len - 1] == '\n') + buf[len - 1] = '\0'; + else { + /* EOF without EOL, copy and add the NUL */ + if ((lbuf = malloc(len + 1)) == NULL) + err(1, NULL); + memcpy(lbuf, buf, len); + lbuf[len] = '\0'; + buf = lbuf; + } + + if (strchr(buf, ':') == NULL && !isspace((int)*buf)) + break; + + if (strncasecmp("Received: ", buf, 10) == 0) { + rcvcount++; + if (rcvcount == MAX_HOPS_COUNT) { + ret = 1; + break; + } + } + + else if (strncasecmp("Delivered-To: ", buf, 14) == 0) { + struct path rcpt; + + bzero(&chkpath, sizeof (struct path)); + if (! recipient_to_path(&chkpath, buf + 14)) + continue; + + rcpt = messagep->recipient; + if (messagep->type == T_BOUNCE_MESSAGE) + rcpt = messagep->sender; + + if (strcasecmp(chkpath.user, rcpt.user) == 0 && + strcasecmp(chkpath.domain, rcpt.domain) == 0) { + ret = 1; + break; + } + } + } + free(lbuf); + + fclose(fp); + return ret; +} + +void +message_reset_flags(struct message *m) +{ + m->flags &= ~F_MESSAGE_SCHEDULED; + m->flags &= ~F_MESSAGE_PROCESSING; + + while (! queue_update_envelope(m)) + sleep(1); +} + +SPLAY_GENERATE(batchtree, batch, b_nodes, batch_cmp); |