summaryrefslogtreecommitdiff
path: root/usr.sbin/smtpd/runner.c
diff options
context:
space:
mode:
authorJacek Masiulaniec <jacekm@cvs.openbsd.org>2010-05-31 23:38:57 +0000
committerJacek Masiulaniec <jacekm@cvs.openbsd.org>2010-05-31 23:38:57 +0000
commitd307483c8c212fa059ff0cd0e59abc3e3d3b2ca3 (patch)
tree39d8b72f5535369d2504027c31822e039f4f731a /usr.sbin/smtpd/runner.c
parent591293015f3e6c1412e51ad20d7817e6987a652f (diff)
Rewrite entire queue code.
Major goals: 1) Fix bad performance caused by the runner process doing full queue read in 1s intervals. My Soekris can now happily accept >50 msg/s while having multi-thousand queue; before, one hundred queue would bring the system to its knees. 2) Introduce Qmail-like scheduler that doesn't write as much to the disk so that it needs less code for servicing error conditions, which in some places can be tricky to get right. 3) Introduce separation between the scheduler and the backend; these two queue aspects shouldn't be too tied too each other. This means that eg. storing queue in SQL requires rewrite of just queue_backend.c. 4) Make on-disk queue format architecture independent, and more easily extensible, to reduce number of flag days in the future. Minor goals: ENOSPC no longer prevents delivery attempts, fixed session limiting for relayed mail, improved batching of "relay via" mails, human-readable mailq output, "show queue raw" command, clearer logging, sending of single bounce about multiple recipients, exact delay= computation, zero delay between deliveries while within session limit (currently 1s delay between re-scheduling is enforced), mta no longer requests content fd, corrected session limit for bounce submissions, tiny <100B queue files instead of multi-KB, detect loops before accepting mail, reduce traffic on imsg channels by killing enormous struct submit_status.
Diffstat (limited to 'usr.sbin/smtpd/runner.c')
-rw-r--r--usr.sbin/smtpd/runner.c889
1 files changed, 0 insertions, 889 deletions
diff --git a/usr.sbin/smtpd/runner.c b/usr.sbin/smtpd/runner.c
deleted file mode 100644
index 40a0f912a1f..00000000000
--- a/usr.sbin/smtpd/runner.c
+++ /dev/null
@@ -1,889 +0,0 @@
-/* $OpenBSD: runner.c,v 1.87 2010/05/19 20:57:10 gilles Exp $ */
-
-/*
- * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
- * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
- * Copyright (c) 2008-2009 Jacek Masiulaniec <jacekm@dobremiasto.net>
- *
- * Permission to use, copy, modify, and distribute this software for any
- * purpose with or without fee is hereby granted, provided that the above
- * copyright notice and this permission notice appear in all copies.
- *
- * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
- * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
- * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
- * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- */
-
-#include <sys/types.h>
-#include <sys/queue.h>
-#include <sys/tree.h>
-#include <sys/param.h>
-#include <sys/socket.h>
-#include <sys/stat.h>
-
-#include <netinet/in.h>
-#include <arpa/inet.h>
-
-#include <ctype.h>
-#include <dirent.h>
-#include <err.h>
-#include <errno.h>
-#include <event.h>
-#include <fcntl.h>
-#include <libgen.h>
-#include <netdb.h>
-#include <pwd.h>
-#include <signal.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <time.h>
-#include <unistd.h>
-
-#include "smtpd.h"
-
-void runner_imsg(struct smtpd *, struct imsgev *, struct imsg *);
-
-__dead void runner_shutdown(void);
-void runner_sig_handler(int, short, void *);
-void runner_setup_events(struct smtpd *);
-void runner_disable_events(struct smtpd *);
-
-void runner_reset_flags(void);
-void runner_process_offline(struct smtpd *);
-
-void runner_timeout(int, short, void *);
-
-void runner_process_queue(struct smtpd *);
-void runner_process_runqueue(struct smtpd *);
-void runner_process_batchqueue(struct smtpd *);
-
-int runner_message_schedule(struct message *, time_t);
-
-void runner_purge_run(void);
-void runner_purge_message(char *);
-
-int runner_check_loop(struct message *);
-
-struct batch *batch_record(struct smtpd *, struct message *);
-struct batch *batch_lookup(struct smtpd *, struct message *);
-
-int runner_force_envelope_schedule(char *);
-int runner_force_message_schedule(char *);
-
-int runner_force_envelope_remove(char *);
-int runner_force_message_remove(char *);
-
-void
-runner_imsg(struct smtpd *env, struct imsgev *iev, struct imsg *imsg)
-{
- struct message *m;
- struct remove *rem;
- struct sched *sched;
-
- switch (imsg->hdr.type) {
- case IMSG_QUEUE_MESSAGE_UPDATE:
- env->stats->runner.active--;
- queue_message_update(imsg->data);
- return;
-
- case IMSG_MDA_SESS_NEW:
- env->stats->mda.sessions_active--;
- return;
-
- case IMSG_BATCH_DONE:
- env->stats->mta.sessions_active--;
- return;
-
- case IMSG_QUEUE_SCHEDULE:
- sched = imsg->data;
- sched->ret = 0;
- if (valid_message_uid(sched->mid))
- sched->ret = runner_force_envelope_schedule(sched->mid);
- else if (valid_message_id(sched->mid))
- sched->ret = runner_force_message_schedule(sched->mid);
- imsg_compose_event(iev, IMSG_QUEUE_SCHEDULE, imsg->hdr.peerid,
- 0, -1, sched, sizeof *sched);
- return;
-
- case IMSG_QUEUE_REMOVE:
- rem = imsg->data;
- rem->ret = 0;
- if (valid_message_uid(rem->mid))
- rem->ret = runner_force_envelope_remove(rem->mid);
- else if (valid_message_id(rem->mid))
- rem->ret = runner_force_message_remove(rem->mid);
- imsg_compose_event(iev, IMSG_QUEUE_REMOVE, imsg->hdr.peerid, 0,
- -1, rem, sizeof *rem);
- return;
-
- case IMSG_PARENT_ENQUEUE_OFFLINE:
- runner_process_offline(env);
- return;
-
- case IMSG_SMTP_ENQUEUE:
- m = imsg->data;
- if (imsg->fd < 0 || !bounce_session(env, imsg->fd, m)) {
- m->status = S_MESSAGE_TEMPFAILURE;
- queue_message_update(m);
- }
- return;
-
- case IMSG_QUEUE_PAUSE_LOCAL:
- env->sc_opts |= SMTPD_MDA_PAUSED;
- return;
-
- case IMSG_QUEUE_RESUME_LOCAL:
- env->sc_opts &= ~SMTPD_MDA_PAUSED;
- return;
-
- case IMSG_QUEUE_PAUSE_OUTGOING:
- env->sc_opts |= SMTPD_MTA_PAUSED;
- return;
-
- case IMSG_QUEUE_RESUME_OUTGOING:
- env->sc_opts &= ~SMTPD_MTA_PAUSED;
- return;
-
- case IMSG_CTL_VERBOSE:
- log_verbose(*(int *)imsg->data);
- return;
- }
-
- fatalx("runner_imsg: unexpected imsg");
-}
-
-void
-runner_sig_handler(int sig, short event, void *p)
-{
- switch (sig) {
- case SIGINT:
- case SIGTERM:
- runner_shutdown();
- break;
- default:
- fatalx("runner_sig_handler: unexpected signal");
- }
-}
-
-void
-runner_shutdown(void)
-{
- log_info("runner handler exiting");
- _exit(0);
-}
-
-void
-runner_setup_events(struct smtpd *env)
-{
- struct timeval tv;
-
- evtimer_set(&env->sc_ev, runner_timeout, env);
- tv.tv_sec = 0;
- tv.tv_usec = 10;
- evtimer_add(&env->sc_ev, &tv);
-}
-
-void
-runner_disable_events(struct smtpd *env)
-{
- evtimer_del(&env->sc_ev);
-}
-
-pid_t
-runner(struct smtpd *env)
-{
- pid_t pid;
- struct passwd *pw;
-
- struct event ev_sigint;
- struct event ev_sigterm;
-
- struct peer peers[] = {
- { PROC_QUEUE, imsg_dispatch }
- };
-
- switch (pid = fork()) {
- case -1:
- fatal("runner: cannot fork");
- case 0:
- break;
- default:
- return (pid);
- }
-
- purge_config(env, PURGE_EVERYTHING);
-
- pw = env->sc_pw;
-
- if (chroot(PATH_SPOOL) == -1)
- fatal("runner: chroot");
- if (chdir("/") == -1)
- fatal("runner: chdir(\"/\")");
-
- smtpd_process = PROC_RUNNER;
- setproctitle("%s", env->sc_title[smtpd_process]);
-
- if (setgroups(1, &pw->pw_gid) ||
- setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) ||
- setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid))
- fatal("runner: cannot drop privileges");
-
- SPLAY_INIT(&env->batch_queue);
-
- imsg_callback = runner_imsg;
- event_init();
-
- signal_set(&ev_sigint, SIGINT, runner_sig_handler, env);
- signal_set(&ev_sigterm, SIGTERM, runner_sig_handler, env);
- signal_add(&ev_sigint, NULL);
- signal_add(&ev_sigterm, NULL);
- signal(SIGPIPE, SIG_IGN);
- signal(SIGHUP, SIG_IGN);
-
- /* see fdlimit()-related comment in queue.c */
- fdlimit(1.0);
- if ((env->sc_maxconn = availdesc() / 4) < 1)
- fatalx("runner: fd starvation");
-
- config_pipes(env, peers, nitems(peers));
- config_peers(env, peers, nitems(peers));
-
- unlink(PATH_QUEUE "/envelope.tmp");
- runner_reset_flags();
- runner_process_offline(env);
-
- runner_setup_events(env);
- event_dispatch();
- runner_shutdown();
-
- return (0);
-}
-
-void
-runner_process_offline(struct smtpd *env)
-{
- char path[MAXPATHLEN];
- struct qwalk *q;
-
- q = qwalk_new(PATH_OFFLINE);
-
- if (qwalk(q, path))
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_PARENT_ENQUEUE_OFFLINE, PROC_PARENT, 0, -1, path,
- strlen(path) + 1);
-
- qwalk_close(q);
-}
-
-void
-runner_reset_flags(void)
-{
- char path[MAXPATHLEN];
- struct message message;
- struct qwalk *q;
-
- q = qwalk_new(PATH_QUEUE);
-
- while (qwalk(q, path)) {
- while (! queue_load_envelope(&message, basename(path)))
- sleep(1);
- message_reset_flags(&message);
- }
-
- qwalk_close(q);
-}
-
-void
-runner_timeout(int fd, short event, void *p)
-{
- struct smtpd *env = p;
- struct timeval tv;
-
- runner_purge_run();
-
- runner_process_queue(env);
- runner_process_runqueue(env);
- runner_process_batchqueue(env);
-
- tv.tv_sec = 1;
- tv.tv_usec = 0;
- evtimer_add(&env->sc_ev, &tv);
-}
-
-void
-runner_process_queue(struct smtpd *env)
-{
- char path[MAXPATHLEN];
- char rqpath[MAXPATHLEN];
- struct message message;
- time_t now;
- size_t mta_av, mda_av, bnc_av;
- struct qwalk *q;
-
- mta_av = env->sc_maxconn - env->stats->mta.sessions_active;
- mda_av = env->sc_maxconn - env->stats->mda.sessions_active;
- bnc_av = env->sc_maxconn - env->stats->runner.bounces_active;
-
- now = time(NULL);
- q = qwalk_new(PATH_QUEUE);
-
- while (qwalk(q, path)) {
- if (! queue_load_envelope(&message, basename(path)))
- continue;
-
- if (message.type & T_MDA_MESSAGE) {
- if (env->sc_opts & SMTPD_MDA_PAUSED)
- continue;
- if (mda_av == 0)
- continue;
- }
-
- if (message.type & T_MTA_MESSAGE) {
- if (env->sc_opts & SMTPD_MTA_PAUSED)
- continue;
- if (mta_av == 0)
- continue;
- }
-
- if (message.type & T_BOUNCE_MESSAGE) {
- if (env->sc_opts & (SMTPD_MDA_PAUSED|SMTPD_MTA_PAUSED))
- continue;
- if (bnc_av == 0)
- continue;
- }
-
- if (! runner_message_schedule(&message, now))
- continue;
-
- if (runner_check_loop(&message)) {
- message_set_errormsg(&message, "loop has been detected");
- bounce_record_message(&message);
- queue_remove_envelope(&message);
- continue;
- }
-
- message.flags |= F_MESSAGE_SCHEDULED;
- message.flags &= ~F_MESSAGE_FORCESCHEDULE;
- queue_update_envelope(&message);
-
- if (! bsnprintf(rqpath, sizeof(rqpath), "%s/%s", PATH_RUNQUEUE,
- basename(path)))
- fatalx("runner_process_queue: snprintf");
-
- if (symlink(path, rqpath) == -1) {
- if (errno == EEXIST)
- continue;
- if (errno == ENOSPC)
- break;
- fatal("runner_process_queue: symlink");
- }
-
- if (message.type & T_MDA_MESSAGE)
- mda_av--;
- if (message.type & T_MTA_MESSAGE)
- mta_av--;
- if (message.type & T_BOUNCE_MESSAGE)
- bnc_av--;
- }
-
- qwalk_close(q);
-}
-
-void
-runner_process_runqueue(struct smtpd *env)
-{
- char path[MAXPATHLEN];
- struct message message;
- time_t tm;
- struct batch *batchp;
- struct message *messagep;
- struct qwalk *q;
-
- tm = time(NULL);
-
- q = qwalk_new(PATH_RUNQUEUE);
-
- while (qwalk(q, path)) {
- unlink(path);
-
- if (! queue_load_envelope(&message, basename(path)))
- continue;
-
- if (message.flags & F_MESSAGE_PROCESSING)
- continue;
-
- message.lasttry = tm;
- message.flags &= ~F_MESSAGE_SCHEDULED;
- message.flags |= F_MESSAGE_PROCESSING;
-
- if (! queue_update_envelope(&message))
- continue;
-
- messagep = calloc(1, sizeof (struct message));
- if (messagep == NULL)
- fatal("runner_process_runqueue: calloc");
- *messagep = message;
-
- messagep->batch_id = 0;
- batchp = batch_lookup(env, messagep);
- if (batchp != NULL)
- messagep->batch_id = batchp->id;
-
- batchp = batch_record(env, messagep);
- if (messagep->batch_id == 0)
- messagep->batch_id = batchp->id;
- }
-
- qwalk_close(q);
-}
-
-void
-runner_process_batchqueue(struct smtpd *env)
-{
- struct batch *batchp;
- struct message *m;
- int fd;
-
- while ((batchp = SPLAY_MIN(batchtree, &env->batch_queue)) != NULL) {
- switch (batchp->type) {
- case T_BOUNCE_BATCH:
- while ((m = TAILQ_FIRST(&batchp->messages))) {
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_SMTP_ENQUEUE, PROC_SMTP, 0, -1, m,
- sizeof *m);
- TAILQ_REMOVE(&batchp->messages, m, entry);
- free(m);
- }
- env->stats->runner.bounces_active++;
- env->stats->runner.bounces++;
- break;
-
- case T_MDA_BATCH:
- m = TAILQ_FIRST(&batchp->messages);
- fd = queue_open_message_file(m->message_id);
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_MDA_SESS_NEW, PROC_MDA, 0, fd, m,
- sizeof *m);
- TAILQ_REMOVE(&batchp->messages, m, entry);
- free(m);
- env->stats->mda.sessions_active++;
- env->stats->mda.sessions++;
- break;
-
- case T_MTA_BATCH:
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_BATCH_CREATE, PROC_MTA, 0, -1, batchp,
- sizeof *batchp);
- while ((m = TAILQ_FIRST(&batchp->messages))) {
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_BATCH_APPEND, PROC_MTA, 0, -1, m,
- sizeof *m);
- TAILQ_REMOVE(&batchp->messages, m, entry);
- free(m);
- }
- imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_BATCH_CLOSE, PROC_MTA, 0, -1, batchp,
- sizeof *batchp);
- env->stats->mta.sessions_active++;
- env->stats->mta.sessions++;
- break;
-
- default:
- fatalx("runner_process_batchqueue: unknown type");
- }
-
- SPLAY_REMOVE(batchtree, &env->batch_queue, batchp);
- free(batchp);
- }
-}
-
-int
-runner_message_schedule(struct message *messagep, time_t tm)
-{
- time_t delay;
-
- if (messagep->flags & (F_MESSAGE_SCHEDULED|F_MESSAGE_PROCESSING))
- return 0;
-
- if (messagep->flags & F_MESSAGE_FORCESCHEDULE)
- return 1;
-
- /* Batch has been in the queue for too long and expired */
- if (tm - messagep->creation >= SMTPD_QUEUE_EXPIRY) {
- message_set_errormsg(messagep, "message expired after sitting in queue for %d days",
- SMTPD_QUEUE_EXPIRY / 60 / 60 / 24);
- bounce_record_message(messagep);
- queue_remove_envelope(messagep);
- return 0;
- }
-
- if (messagep->lasttry == 0)
- return 1;
-
- delay = SMTPD_QUEUE_MAXINTERVAL;
-
- // recompute path
-
- if (messagep->type == T_MDA_MESSAGE ||
- messagep->type == T_BOUNCE_MESSAGE) {
- if (messagep->retry < 5)
- return 1;
-
- if (messagep->retry < 15)
- delay = (messagep->retry * 60) + arc4random_uniform(60);
- }
-
- if (messagep->type == T_MTA_MESSAGE) {
- if (messagep->retry < 3)
- delay = SMTPD_QUEUE_INTERVAL;
- else if (messagep->retry <= 7) {
- delay = SMTPD_QUEUE_INTERVAL * (1 << (messagep->retry - 3));
- if (delay > SMTPD_QUEUE_MAXINTERVAL)
- delay = SMTPD_QUEUE_MAXINTERVAL;
- }
- }
-
- if (tm >= messagep->lasttry + delay)
- return 1;
-
- return 0;
-}
-
-int
-runner_force_envelope_schedule(char *mid)
-{
- struct message message;
-
- if (! queue_load_envelope(&message, mid))
- return 0;
-
- if (message.flags & (F_MESSAGE_PROCESSING|F_MESSAGE_SCHEDULED))
- return 1;
-
- message.flags |= F_MESSAGE_FORCESCHEDULE;
-
- if (! queue_update_envelope(&message))
- return 0;
-
- return 1;
-}
-
-int
-runner_force_message_schedule(char *mid)
-{
- char path[MAXPATHLEN];
- DIR *dirp;
- struct dirent *dp;
-
- if (! bsnprintf(path, MAXPATHLEN, "%s/%d/%s/envelopes",
- PATH_QUEUE, queue_hash(mid), mid))
- return 0;
-
- dirp = opendir(path);
- if (dirp == NULL)
- return 0;
-
- while ((dp = readdir(dirp)) != NULL) {
- if (valid_message_uid(dp->d_name))
- runner_force_envelope_schedule(dp->d_name);
- }
- closedir(dirp);
-
- return 1;
-}
-
-
-int
-runner_force_envelope_remove(char *mid)
-{
- struct message message;
-
- if (! queue_load_envelope(&message, mid))
- return 0;
-
- if (message.flags & (F_MESSAGE_PROCESSING|F_MESSAGE_SCHEDULED))
- return 0;
-
- if (! queue_remove_envelope(&message))
- return 0;
-
- return 1;
-}
-
-int
-runner_force_message_remove(char *mid)
-{
- char path[MAXPATHLEN];
- DIR *dirp;
- struct dirent *dp;
-
- if (! bsnprintf(path, MAXPATHLEN, "%s/%d/%s/envelopes",
- PATH_QUEUE, queue_hash(mid), mid))
- return 0;
-
- dirp = opendir(path);
- if (dirp == NULL)
- return 0;
-
- while ((dp = readdir(dirp)) != NULL) {
- if (valid_message_uid(dp->d_name))
- runner_force_envelope_remove(dp->d_name);
- }
- closedir(dirp);
-
- return 1;
-}
-
-void
-runner_purge_run(void)
-{
- char path[MAXPATHLEN];
- struct qwalk *q;
-
- q = qwalk_new(PATH_PURGE);
-
- while (qwalk(q, path))
- runner_purge_message(basename(path));
-
- qwalk_close(q);
-}
-
-void
-runner_purge_message(char *msgid)
-{
- char rootdir[MAXPATHLEN];
- char evpdir[MAXPATHLEN];
- char evppath[MAXPATHLEN];
- char msgpath[MAXPATHLEN];
- DIR *dirp;
- struct dirent *dp;
-
- if (! bsnprintf(rootdir, sizeof(rootdir), "%s/%s", PATH_PURGE, msgid))
- fatal("runner_purge_message: snprintf");
-
- if (! bsnprintf(evpdir, sizeof(evpdir), "%s%s", rootdir,
- PATH_ENVELOPES))
- fatal("runner_purge_message: snprintf");
-
- if (! bsnprintf(msgpath, sizeof(msgpath), "%s/message", rootdir))
- fatal("runner_purge_message: snprintf");
-
- if (unlink(msgpath) == -1)
- if (errno != ENOENT)
- fatal("runner_purge_message: unlink");
-
- dirp = opendir(evpdir);
- if (dirp == NULL) {
- if (errno == ENOENT)
- goto delroot;
- fatal("runner_purge_message: opendir");
- }
- while ((dp = readdir(dirp)) != NULL) {
- if (strcmp(dp->d_name, ".") == 0 ||
- strcmp(dp->d_name, "..") == 0)
- continue;
- if (! bsnprintf(evppath, sizeof(evppath), "%s/%s", evpdir,
- dp->d_name))
- fatal("runner_purge_message: snprintf");
-
- if (unlink(evppath) == -1)
- if (errno != ENOENT)
- fatal("runner_purge_message: unlink");
- }
- closedir(dirp);
-
- if (rmdir(evpdir) == -1)
- if (errno != ENOENT)
- fatal("runner_purge_message: rmdir");
-
-delroot:
- if (rmdir(rootdir) == -1)
- if (errno != ENOENT)
- fatal("runner_purge_message: rmdir");
-}
-
-struct batch *
-batch_record(struct smtpd *env, struct message *messagep)
-{
- struct batch *batchp;
- struct path *path;
-
- batchp = NULL;
- if (messagep->batch_id != 0) {
- batchp = batch_by_id(env, messagep->batch_id);
- if (batchp == NULL)
- fatalx("batch_record: internal inconsistency.");
- }
- if (batchp == NULL) {
- batchp = calloc(1, sizeof(struct batch));
- if (batchp == NULL)
- fatal("batch_record: calloc");
-
- batchp->id = generate_uid();
-
- (void)strlcpy(batchp->message_id, messagep->message_id,
- sizeof(batchp->message_id));
- TAILQ_INIT(&batchp->messages);
- SPLAY_INSERT(batchtree, &env->batch_queue, batchp);
-
- if (messagep->type & T_BOUNCE_MESSAGE) {
- batchp->type = T_BOUNCE_BATCH;
- path = &messagep->sender;
- }
- else {
- path = &messagep->recipient;
- }
- batchp->rule = path->rule;
-
- (void)strlcpy(batchp->hostname, path->domain,
- sizeof(batchp->hostname));
-
- if (batchp->type != T_BOUNCE_BATCH) {
- if (IS_MAILBOX(*path) || IS_EXT(*path)) {
- batchp->type = T_MDA_BATCH;
- }
- else {
- batchp->type = T_MTA_BATCH;
- }
- }
- }
-
- TAILQ_INSERT_TAIL(&batchp->messages, messagep, entry);
- env->stats->runner.active++;
- return batchp;
-}
-
-struct batch *
-batch_lookup(struct smtpd *env, struct message *message)
-{
- struct batch *batchp;
- struct batch lookup;
-
- /* We only support delivery of one message at a time, in MDA
- * and bounces messages.
- */
- if (message->type == T_BOUNCE_MESSAGE || message->type == T_MDA_MESSAGE)
- return NULL;
-
- /* If message->batch_id != 0, we can retrieve batch by id */
- if (message->batch_id != 0) {
- lookup.id = message->batch_id;
- return SPLAY_FIND(batchtree, &env->batch_queue, &lookup);
- }
-
- /* We do not know the batch_id yet, maybe it was created but we could not
- * be notified, or it just does not exist. Let's scan to see if we can do
- * a match based on our message_id and flags.
- */
- SPLAY_FOREACH(batchp, batchtree, &env->batch_queue) {
-
- if (batchp->type != message->type)
- continue;
-
- if (strcasecmp(batchp->message_id, message->message_id) != 0)
- continue;
-
- if (batchp->type & T_MTA_BATCH)
- if (strcasecmp(batchp->hostname, message->recipient.domain) != 0)
- continue;
-
- break;
- }
-
- return batchp;
-}
-
-int
-batch_cmp(struct batch *s1, struct batch *s2)
-{
- /*
- * do not return u_int64_t's
- */
- if (s1->id < s2->id)
- return (-1);
-
- if (s1->id > s2->id)
- return (1);
-
- return (0);
-}
-
-int
-runner_check_loop(struct message *messagep)
-{
- int fd;
- FILE *fp;
- char *buf, *lbuf;
- size_t len;
- struct path chkpath;
- int ret = 0;
- int rcvcount = 0;
-
- fd = queue_open_message_file(messagep->message_id);
- if ((fp = fdopen(fd, "r")) == NULL)
- fatal("fdopen");
-
- lbuf = NULL;
- while ((buf = fgetln(fp, &len))) {
- if (buf[len - 1] == '\n')
- buf[len - 1] = '\0';
- else {
- /* EOF without EOL, copy and add the NUL */
- if ((lbuf = malloc(len + 1)) == NULL)
- err(1, NULL);
- memcpy(lbuf, buf, len);
- lbuf[len] = '\0';
- buf = lbuf;
- }
-
- if (strchr(buf, ':') == NULL && !isspace((int)*buf))
- break;
-
- if (strncasecmp("Received: ", buf, 10) == 0) {
- rcvcount++;
- if (rcvcount == MAX_HOPS_COUNT) {
- ret = 1;
- break;
- }
- }
-
- else if (strncasecmp("Delivered-To: ", buf, 14) == 0) {
- struct path rcpt;
-
- bzero(&chkpath, sizeof (struct path));
- if (! recipient_to_path(&chkpath, buf + 14))
- continue;
-
- rcpt = messagep->recipient;
- if (messagep->type == T_BOUNCE_MESSAGE)
- rcpt = messagep->sender;
-
- if (strcasecmp(chkpath.user, rcpt.user) == 0 &&
- strcasecmp(chkpath.domain, rcpt.domain) == 0) {
- ret = 1;
- break;
- }
- }
- }
- free(lbuf);
-
- fclose(fp);
- return ret;
-}
-
-void
-message_reset_flags(struct message *m)
-{
- m->flags &= ~F_MESSAGE_SCHEDULED;
- m->flags &= ~F_MESSAGE_PROCESSING;
-
- while (! queue_update_envelope(m))
- sleep(1);
-}
-
-SPLAY_GENERATE(batchtree, batch, b_nodes, batch_cmp);