diff options
Diffstat (limited to 'usr.sbin/smtpd/scheduler.c')
-rw-r--r-- | usr.sbin/smtpd/scheduler.c | 454 |
1 files changed, 174 insertions, 280 deletions
diff --git a/usr.sbin/smtpd/scheduler.c b/usr.sbin/smtpd/scheduler.c index 490e4db546e..9efaf8029a4 100644 --- a/usr.sbin/smtpd/scheduler.c +++ b/usr.sbin/smtpd/scheduler.c @@ -1,9 +1,10 @@ -/* $OpenBSD: scheduler.c,v 1.7 2012/07/18 22:04:49 eric Exp $ */ +/* $OpenBSD: scheduler.c,v 1.8 2012/08/08 08:50:42 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> * Copyright (c) 2008-2009 Jacek Masiulaniec <jacekm@dobremiasto.net> + * Copyright (c) 2012 Eric Faurot <eric@openbsd.org> * * Permission to use, copy, modify, and distribute this software for any * purpose with or without fee is hereby granted, provided that the above @@ -47,17 +48,18 @@ static void scheduler_imsg(struct imsgev *, struct imsg *); static void scheduler_shutdown(void); static void scheduler_sig_handler(int, short, void *); -static void scheduler_setup_events(void); static void scheduler_reset_events(void); -static void scheduler_disable_events(void); static void scheduler_timeout(int, short, void *); -static void scheduler_remove(u_int64_t); -static void scheduler_remove_envelope(u_int64_t); -static int scheduler_process_envelope(u_int64_t); -static int scheduler_process_batch(enum delivery_type, u_int64_t); -static int scheduler_check_loop(struct envelope *); +static void scheduler_process_remove(struct scheduler_batch *); +static void scheduler_process_expire(struct scheduler_batch *); +static void scheduler_process_bounce(struct scheduler_batch *); +static void scheduler_process_mda(struct scheduler_batch *); +static void scheduler_process_mta(struct scheduler_batch *); static int scheduler_load_message(u_int32_t); +void scheduler_envelope_update(struct envelope *); +void scheduler_envelope_delete(struct envelope *); + static struct scheduler_backend *backend = NULL; extern const char *backend_scheduler; @@ -84,8 +86,9 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg) e = imsg->data; log_trace(TRACE_SCHEDULER, "scheduler: IMSG_QUEUE_DELIVERY_OK: %016"PRIx64, e->id); - backend->remove(e->id); + backend->delete(e->id); queue_envelope_delete(e); + scheduler_reset_events(); return; case IMSG_QUEUE_DELIVERY_TEMPFAIL: @@ -96,7 +99,7 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg) e->retry++; queue_envelope_update(e); scheduler_info(&si, e); - backend->insert(&si); + backend->update(&si); scheduler_reset_events(); return; @@ -109,10 +112,11 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg) bounce_record_message(e, &bounce); scheduler_info(&si, &bounce); backend->insert(&si); - scheduler_reset_events(); + backend->commit(evpid_to_msgid(bounce.id)); } - backend->remove(e->id); + backend->delete(e->id); queue_envelope_delete(e); + scheduler_reset_events(); return; case IMSG_MDA_SESS_NEW: @@ -138,7 +142,7 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg) if (imsg->fd < 0 || !bounce_session(imsg->fd, e)) { queue_envelope_update(e); scheduler_info(&si, e); - backend->insert(&si); + backend->update(&si); scheduler_reset_events(); return; } @@ -175,24 +179,23 @@ scheduler_imsg(struct imsgev *iev, struct imsg *imsg) log_trace(TRACE_SCHEDULER, "scheduler: IMSG_SCHEDULER_SCHEDULE: %016"PRIx64, *(u_int64_t *)imsg->data); - backend->force(*(u_int64_t *)imsg->data); - scheduler_reset_events(); + backend->schedule(*(u_int64_t *)imsg->data); + scheduler_reset_events(); return; case IMSG_SCHEDULER_REMOVE: log_trace(TRACE_SCHEDULER, "scheduler: IMSG_SCHEDULER_REMOVE: %016"PRIx64, *(u_int64_t *)imsg->data); - scheduler_remove(*(u_int64_t *)imsg->data); + backend->remove(*(u_int64_t *)imsg->data); scheduler_reset_events(); return; - } errx(1, "scheduler_imsg: unexpected %s imsg", imsg_to_str(imsg->hdr.type)); } -void +static void scheduler_sig_handler(int sig, short event, void *p) { switch (sig) { @@ -205,25 +208,14 @@ scheduler_sig_handler(int sig, short event, void *p) } } -void +static void scheduler_shutdown(void) { log_info("scheduler handler exiting"); _exit(0); } -void -scheduler_setup_events(void) -{ - struct timeval tv; - - evtimer_set(&env->sc_ev, scheduler_timeout, NULL); - tv.tv_sec = 0; - tv.tv_usec = 10; - evtimer_add(&env->sc_ev, &tv); -} - -void +static void scheduler_reset_events(void) { struct timeval tv; @@ -233,12 +225,6 @@ scheduler_reset_events(void) evtimer_add(&env->sc_ev, &tv); } -void -scheduler_disable_events(void) -{ - evtimer_del(&env->sc_ev); -} - pid_t scheduler(void) { @@ -304,200 +290,198 @@ scheduler(void) config_pipes(peers, nitems(peers)); config_peers(peers, nitems(peers)); - scheduler_setup_events(); + evtimer_set(&env->sc_ev, scheduler_timeout, NULL); + scheduler_reset_events(); event_dispatch(); - scheduler_disable_events(); scheduler_shutdown(); return (0); } -void +static void scheduler_timeout(int fd, short event, void *p) { - time_t nsched; - time_t curtm; - u_int64_t evpid; - static int setup = 0; - int delay = 0; - struct timeval tv; - - log_trace(TRACE_SCHEDULER, "scheduler: entering scheduler_timeout"); - - /* if we're not done setting up the scheduler, do it some more */ - if (! setup) - setup = backend->setup(); - - /* we don't have a schedulable envelope ... sleep */ - if (! backend->next(&evpid, &nsched)) - goto scheduler_sleep; - - /* is the envelope schedulable right away ? */ - curtm = time(NULL); - if (nsched <= curtm) { - /* yup */ - scheduler_process_envelope(evpid); - } - else { - /* nope, so we can either keep the timeout delay to 0 if we - * are not done setting up the scheduler, or sleep until it - * is time to schedule that envelope otherwise. - */ - if (setup) - delay = nsched - curtm; - } + struct timeval tv; + struct scheduler_batch batch; + int typemask; - if (delay) - log_trace(TRACE_SCHEDULER, "scheduler: pausing for %d seconds", - delay); - tv.tv_sec = delay; - tv.tv_usec = 0; - evtimer_add(&env->sc_ev, &tv); - return; + log_trace(TRACE_SCHEDULER, "scheduler: getting next batch"); -scheduler_sleep: - log_trace(TRACE_SCHEDULER, "scheduler: sleeping"); - return; -} - -static int -scheduler_process_envelope(u_int64_t evpid) -{ - struct envelope envelope; - size_t mta_av, mda_av, bnc_av; - struct scheduler_info si; + tv.tv_sec = 0; + tv.tv_usec = 0; - mta_av = env->sc_maxconn - stat_get(STATS_MTA_SESSION, STAT_ACTIVE); - mda_av = env->sc_maxconn - stat_get(STATS_MDA_SESSION, STAT_ACTIVE); - bnc_av = env->sc_maxconn - stat_get(STATS_SCHEDULER_BOUNCES, STAT_ACTIVE); + typemask = SCHED_REMOVE | SCHED_EXPIRE | SCHED_BOUNCE; + if (!(env->sc_flags & SMTPD_MDA_PAUSED)) + typemask |= SCHED_MDA; + if (!(env->sc_flags & SMTPD_MTA_PAUSED)) + typemask |= SCHED_MTA; - if (! queue_envelope_load(evpid, &envelope)) - return 0; + backend->batch(typemask, time(NULL), &batch); + switch (batch.type) { + case SCHED_NONE: + log_trace(TRACE_SCHEDULER, "scheduler: sleeping"); + return; - if (envelope.type == D_MDA) - if (mda_av == 0) { - env->sc_flags |= SMTPD_MDA_BUSY; - return 0; - } + case SCHED_DELAY: + tv.tv_sec = batch.delay; + log_trace(TRACE_SCHEDULER, + "scheduler: pausing for %li seconds", tv.tv_sec); + break; - if (envelope.type == D_MTA) - if (mta_av == 0) { - env->sc_flags |= SMTPD_MTA_BUSY; - return 0; - } + case SCHED_REMOVE: + scheduler_process_remove(&batch); + break; - if (envelope.type == D_BOUNCE) - if (bnc_av == 0) { - env->sc_flags |= SMTPD_BOUNCE_BUSY; - return 0; - } + case SCHED_EXPIRE: + scheduler_process_expire(&batch); + break; - if (scheduler_check_loop(&envelope)) { - struct envelope bounce; + case SCHED_BOUNCE: + scheduler_process_bounce(&batch); + break; - envelope_set_errormsg(&envelope, "loop has been detected"); - if (bounce_record_message(&envelope, &bounce)) { - scheduler_info(&si, &bounce); - backend->insert(&si); - } - backend->remove(evpid); - queue_envelope_delete(&envelope); + case SCHED_MDA: + scheduler_process_mda(&batch); + break; - scheduler_reset_events(); + case SCHED_MTA: + scheduler_process_mta(&batch); + break; - return 0; + default: + fatalx("scheduler_timeout: unknown batch type"); } + evtimer_add(&env->sc_ev, &tv); +} - return scheduler_process_batch(envelope.type, evpid); +static void +scheduler_process_remove(struct scheduler_batch *batch) +{ + struct envelope evp; + struct id_list *e; + + while ((e = batch->evpids)) { + batch->evpids = e->next; + log_debug("scheduler: deleting evp:%016" PRIx64 " (removed)", + e->id); + evp.id = e->id; + queue_envelope_delete(&evp); + free(e); + } } -static int -scheduler_process_batch(enum delivery_type type, u_int64_t evpid) +static void +scheduler_process_expire(struct scheduler_batch *batch) { struct envelope evp; - void *batch; - int fd; - - batch = backend->batch(evpid); - switch (type) { - case D_BOUNCE: - while (backend->fetch(batch, &evpid)) { - if (! queue_envelope_load(evpid, &evp)) - goto end; - - evp.lasttry = time(NULL); - imsg_compose_event(env->sc_ievs[PROC_QUEUE], - IMSG_SMTP_ENQUEUE, PROC_SMTP, 0, -1, &evp, - sizeof evp); - backend->schedule(evpid); - } + struct id_list *e; + + while ((e = batch->evpids)) { + batch->evpids = e->next; + log_debug("scheduler: deleting evp:%016" PRIx64 " (expire)", + e->id); + evp.id = e->id; + queue_envelope_delete(&evp); + free(e); + } +} + +static void +scheduler_process_bounce(struct scheduler_batch *batch) +{ + struct envelope evp; + struct id_list *e; + + while ((e = batch->evpids)) { + batch->evpids = e->next; + log_debug("scheduler: scheduling evp:%016" PRIx64 " (bounce)", + e->id); + queue_envelope_load(e->id, &evp); + evp.lasttry = time(NULL); + imsg_compose_event(env->sc_ievs[PROC_QUEUE], + IMSG_SMTP_ENQUEUE, PROC_SMTP, 0, -1, &evp, + sizeof evp); stat_increment(STATS_SCHEDULER); stat_increment(STATS_SCHEDULER_BOUNCES); - break; - - case D_MDA: - backend->fetch(batch, &evpid); - if (! queue_envelope_load(evpid, &evp)) - goto end; - + free(e); + } +} + +static void +scheduler_process_mda(struct scheduler_batch *batch) +{ + struct envelope evp; + struct id_list *e; + int fd; + + while ((e = batch->evpids)) { + batch->evpids = e->next; + log_debug("scheduler: scheduling evp:%016" PRIx64 " (mda)", + e->id); + queue_envelope_load(e->id, &evp); evp.lasttry = time(NULL); - fd = queue_message_fd_r(evpid_to_msgid(evpid)); + fd = queue_message_fd_r(evpid_to_msgid(evp.id)); imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_MDA_SESS_NEW, PROC_MDA, 0, fd, &evp, sizeof evp); - backend->schedule(evpid); - stat_increment(STATS_SCHEDULER); stat_increment(STATS_MDA_SESSION); - break; + free(e); + } +} - case D_MTA: { - struct mta_batch mta_batch; +static void +scheduler_process_mta(struct scheduler_batch *batch) +{ + struct envelope evp; + struct mta_batch mta_batch; + struct id_list *e; - /* FIXME */ - if (! backend->fetch(batch, &evpid)) - goto end; - if (! queue_envelope_load(evpid, &evp)) - goto end; + queue_envelope_load(batch->evpids->id, &evp); - bzero(&mta_batch, sizeof mta_batch); - mta_batch.id = arc4random(); - mta_batch.relay = evp.agent.mta.relay; + bzero(&mta_batch, sizeof mta_batch); + mta_batch.id = arc4random(); + mta_batch.relay = evp.agent.mta.relay; - imsg_compose_event(env->sc_ievs[PROC_QUEUE], - IMSG_BATCH_CREATE, PROC_MTA, 0, -1, &mta_batch, - sizeof mta_batch); + imsg_compose_event(env->sc_ievs[PROC_QUEUE], + IMSG_BATCH_CREATE, PROC_MTA, 0, -1, &mta_batch, + sizeof mta_batch); - while (backend->fetch(batch, &evpid)) { - if (! queue_envelope_load(evpid, &evp)) - goto end; - evp.lasttry = time(NULL); /* FIXME */ - evp.batch_id = mta_batch.id; + while ((e = batch->evpids)) { + batch->evpids = e->next; + log_debug("scheduler: scheduling evp:%016" PRIx64 " (mta)", + e->id); + queue_envelope_load(e->id, &evp); + evp.lasttry = time(NULL); + evp.batch_id = mta_batch.id; + imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_BATCH_APPEND, + PROC_MTA, 0, -1, &evp, sizeof evp); + free(e); + stat_increment(STATS_SCHEDULER); + } - imsg_compose_event(env->sc_ievs[PROC_QUEUE], - IMSG_BATCH_APPEND, PROC_MTA, 0, -1, &evp, - sizeof evp); + imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_BATCH_CLOSE, + PROC_MTA, 0, -1, &mta_batch, sizeof mta_batch); - backend->schedule(evpid); - stat_increment(STATS_SCHEDULER); - } + stat_increment(STATS_MTA_SESSION); +} - imsg_compose_event(env->sc_ievs[PROC_QUEUE], - IMSG_BATCH_CLOSE, PROC_MTA, 0, -1, &mta_batch, - sizeof mta_batch); +void +scheduler_envelope_update(struct envelope *e) +{ + struct scheduler_info si; - stat_increment(STATS_MTA_SESSION); - break; - } - - default: - fatalx("scheduler_process_batchqueue: unknown type"); - } + scheduler_info(&si, e); + backend->update(&si); + scheduler_reset_events(); +} -end: - backend->close(batch); - return 1; +void +scheduler_envelope_delete(struct envelope *e) +{ + backend->delete(e->id); + scheduler_reset_events(); } static int @@ -506,7 +490,7 @@ scheduler_load_message(u_int32_t msgid) struct qwalk *q; u_int64_t evpid; struct envelope envelope; - struct scheduler_info si; + struct scheduler_info si; q = qwalk_new(msgid); while (qwalk(q, &evpid)) { @@ -515,98 +499,8 @@ scheduler_load_message(u_int32_t msgid) scheduler_info(&si, &envelope); backend->insert(&si); } - qwalk_close(q); + qwalk_close(q); + backend->commit(msgid); return 1; } - -static int -scheduler_check_loop(struct envelope *ep) -{ - int fd; - FILE *fp; - char *buf, *lbuf; - size_t len; - struct mailaddr maddr; - int ret = 0; - int rcvcount = 0; - - fd = queue_message_fd_r(evpid_to_msgid(ep->id)); - if ((fp = fdopen(fd, "r")) == NULL) - fatal("fdopen"); - - lbuf = NULL; - while ((buf = fgetln(fp, &len))) { - if (buf[len - 1] == '\n') - buf[len - 1] = '\0'; - else { - /* EOF without EOL, copy and add the NUL */ - if ((lbuf = malloc(len + 1)) == NULL) - err(1, NULL); - memcpy(lbuf, buf, len); - lbuf[len] = '\0'; - buf = lbuf; - } - - if (strchr(buf, ':') == NULL && !isspace((int)*buf)) - break; - - if (strncasecmp("Received: ", buf, 10) == 0) { - rcvcount++; - if (rcvcount == MAX_HOPS_COUNT) { - ret = 1; - break; - } - } - - else if (strncasecmp("Delivered-To: ", buf, 14) == 0) { - struct mailaddr dest; - - bzero(&maddr, sizeof (struct mailaddr)); - if (! email_to_mailaddr(&maddr, buf + 14)) - continue; - - dest = ep->dest; - if (ep->type == D_BOUNCE) - dest = ep->sender; - - if (strcasecmp(maddr.user, dest.user) == 0 && - strcasecmp(maddr.domain, dest.domain) == 0) { - ret = 1; - break; - } - } - } - free(lbuf); - - fclose(fp); - return ret; -} - -static void -scheduler_remove(u_int64_t id) -{ - void *msg; - - /* removing by evpid */ - if (id > 0xffffffffL) { - scheduler_remove_envelope(id); - return; - } - - /* removing by msgid */ - msg = backend->message(id); - while (backend->fetch(msg, &id)) - scheduler_remove_envelope(id); - backend->close(msg); -} - -static void -scheduler_remove_envelope(u_int64_t evpid) -{ - struct envelope evp; - - evp.id = evpid; - queue_envelope_delete(&evp); - backend->remove(evpid); -} |