diff options
author | Eric Faurot <eric@cvs.openbsd.org> | 2013-10-27 11:01:48 +0000 |
---|---|---|
committer | Eric Faurot <eric@cvs.openbsd.org> | 2013-10-27 11:01:48 +0000 |
commit | 8ca395d4713666bd6faa4b4c546ef78d5c348fc5 (patch) | |
tree | 45347e2082c65c8a8c2861197569cfed599798b0 /usr.sbin/smtpd/mfa_session.c | |
parent | 615c989e409f084e205249da90531e8a9a2de063 (diff) |
Make the filter infrastructure move forward.
This is a work-in-progress and it's not supposed to be useable for now.
Diffstat (limited to 'usr.sbin/smtpd/mfa_session.c')
-rw-r--r-- | usr.sbin/smtpd/mfa_session.c | 351 |
1 files changed, 238 insertions, 113 deletions
diff --git a/usr.sbin/smtpd/mfa_session.c b/usr.sbin/smtpd/mfa_session.c index 2c212b6baba..312aedf650f 100644 --- a/usr.sbin/smtpd/mfa_session.c +++ b/usr.sbin/smtpd/mfa_session.c @@ -1,4 +1,4 @@ -/* $OpenBSD: mfa_session.c,v 1.19 2013/10/26 12:27:59 eric Exp $ */ +/* $OpenBSD: mfa_session.c,v 1.20 2013/10/27 11:01:47 eric Exp $ */ /* * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org> @@ -52,22 +52,27 @@ enum { QUERY_DONE }; -struct mfa_filter { - TAILQ_ENTRY(mfa_filter) entry; + +struct mfa_filterproc { + TAILQ_ENTRY(mfa_filterproc) entry; struct mproc mproc; int hooks; int flags; int ready; }; -struct mfa_filter_chain { - TAILQ_HEAD(, mfa_filter) filters; +struct mfa_filter { + TAILQ_ENTRY(mfa_filter) entry; + struct mfa_filterproc *proc; }; +TAILQ_HEAD(mfa_filters, mfa_filter); struct mfa_session { - uint64_t id; - int terminate; - TAILQ_HEAD(mfa_queries, mfa_query) queries; + uint64_t id; + int terminate; + TAILQ_HEAD(mfa_queries, mfa_query) queries; + struct mfa_filters *filters; + struct mfa_filter *fcurr; }; struct mfa_query { @@ -91,6 +96,7 @@ struct mfa_query { } connect; char line[SMTPD_MAXLINESIZE]; struct mailaddr maddr; + size_t datalen; } u; /* current response */ @@ -105,11 +111,14 @@ static void mfa_filter_imsg(struct mproc *, struct imsg *); static struct mfa_query *mfa_query(struct mfa_session *, int, int); static void mfa_drain_query(struct mfa_query *); static void mfa_run_query(struct mfa_filter *, struct mfa_query *); -static void mfa_run_data(struct mfa_filter *, uint64_t, const char *); -static struct mfa_filter_chain chain; +static void mfa_set_fdout(struct mfa_session *, int); + +static TAILQ_HEAD(, mfa_filterproc) procs; +struct dict chains; static const char * mfa_query_to_text(struct mfa_query *); static const char * mfa_filter_to_text(struct mfa_filter *); +static const char * mfa_filterproc_to_text(struct mfa_filterproc *); static const char * type_to_str(int); static const char * hook_to_str(int); static const char * status_to_str(int); @@ -118,32 +127,125 @@ static const char * filterimsg_to_str(int); struct tree sessions; struct tree queries; + +static void +mfa_extend_chain(struct mfa_filters *chain, const char *name) +{ + struct mfa_filter *n; + struct mfa_filters *fchain; + struct filter *fconf; + int i; + + fconf = dict_xget(&env->sc_filters, name); + if (fconf->chain) { + log_debug("mfa: extending with \"%s\"", name); + for (i = 0; i < MAX_FILTER_PER_CHAIN; i++) { + if (!fconf->filters[i][0]) + break; + mfa_extend_chain(chain, fconf->filters[i]); + } + } + else { + log_debug("mfa: adding filter \"%s\"", name); + n = xcalloc(1, sizeof(*n), "mfa_extend_chain"); + fchain = dict_get(&chains, name); + n->proc = TAILQ_FIRST(fchain)->proc; + TAILQ_INSERT_TAIL(chain, n, entry); + } +} + void mfa_filter_prepare(void) { static int prepare = 0; struct filter *filter; void *iter; + struct mfa_filterproc *proc; + struct mfa_filters *fchain; struct mfa_filter *f; struct mproc *p; + int done, i; if (prepare) return; prepare = 1; - TAILQ_INIT(&chain.filters); + TAILQ_INIT(&procs); + dict_init(&chains); + log_debug("mfa: building simple chains..."); + + /* create all filter proc and associated chains */ iter = NULL; while (dict_iter(&env->sc_filters, &iter, NULL, (void **)&filter)) { - f = xcalloc(1, sizeof *f, "mfa_filter_init"); - p = &f->mproc; + if (filter->chain) + continue; + + log_debug("mfa: building simple chain \"%s\"", filter->name); + + proc = xcalloc(1, sizeof(*proc), "mfa_filter_init"); + p = &proc->mproc; p->handler = mfa_filter_imsg; p->proc = PROC_FILTER; p->name = xstrdup(filter->name, "mfa_filter_init"); - p->data = f; + p->data = proc; if (mproc_fork(p, filter->path, filter->name) < 0) fatalx("mfa_filter_init"); - TAILQ_INSERT_TAIL(&chain.filters, f, entry); + + log_debug("mfa: registering proc \"%s\"", filter->name); + + f = xcalloc(1, sizeof(*f), "mfa_filter_init"); + f->proc = proc; + + TAILQ_INSERT_TAIL(&procs, proc, entry); + fchain = xcalloc(1, sizeof(*fchain), "mfa_filter_prepare"); + TAILQ_INIT(fchain); + TAILQ_INSERT_TAIL(fchain, f, entry); + dict_xset(&chains, filter->name, fchain); + filter->done = 1; + } + + log_debug("mfa: building complex chains..."); + + /* resolve all chains */ + done = 0; + while (!done) { + done = 1; + iter = NULL; + while (dict_iter(&env->sc_filters, &iter, NULL, (void **)&filter)) { + if (filter->done) + continue; + done = 0; + filter->done = 1; + for (i = 0; i < MAX_FILTER_PER_CHAIN; i++) { + if (!filter->filters[i][0]) + break; + if (!dict_get(&chains, filter->filters[i])) { + filter->done = 0; + break; + } + } + if (filter->done == 0) + continue; + fchain = xcalloc(1, sizeof(*fchain), "mfa_filter_prepare"); + TAILQ_INIT(fchain); + log_debug("mfa: building chain \"%s\"...", filter->name); + for (i = 0; i < MAX_FILTER_PER_CHAIN; i++) { + if (!filter->filters[i][0]) + break; + mfa_extend_chain(fchain, filter->filters[i]); + } + log_debug("mfa: done building chain \"%s\"", filter->name); + dict_xset(&chains, filter->name, fchain); + } + } + log_debug("mfa: done building complex chains"); + + if (dict_get(&chains, "default") == NULL) { + log_debug("mfa: done building default chain"); + fchain = xcalloc(1, sizeof(*fchain), "mfa_filter_prepare"); + TAILQ_INIT(fchain); + dict_xset(&chains, "default", fchain); } } @@ -151,8 +253,7 @@ void mfa_filter_init(void) { static int init = 0; - struct mfa_filter *f; - struct mproc *p; + struct mfa_filterproc *p; if (init) return; @@ -161,15 +262,15 @@ mfa_filter_init(void) tree_init(&sessions); tree_init(&queries); - TAILQ_FOREACH(f, &chain.filters, entry) { - p = &f->mproc; - m_create(p, IMSG_FILTER_REGISTER, 0, 0, -1); - m_add_u32(p, FILTER_API_VERSION); - m_close(p); - mproc_enable(p); + TAILQ_FOREACH(p, &procs, entry) { + m_create(&p->mproc, IMSG_FILTER_REGISTER, 0, 0, -1); + m_add_u32(&p->mproc, FILTER_API_VERSION); + m_add_string(&p->mproc, p->mproc.name); + m_close(&p->mproc); + mproc_enable(&p->mproc); } - if (TAILQ_FIRST(&chain.filters) == NULL) + if (TAILQ_FIRST(&procs) == NULL) mfa_ready(); } @@ -182,6 +283,7 @@ mfa_filter_connect(uint64_t id, const struct sockaddr *local, s = xcalloc(1, sizeof(*s), "mfa_query_connect"); s->id = id; + s->filters = dict_xget(&chains, "default"); TAILQ_INIT(&s->queries); tree_xset(&sessions, s->id, s); @@ -244,63 +346,64 @@ mfa_filter_line(uint64_t id, int hook, const char *line) } void -mfa_filter(uint64_t id, int hook) +mfa_filter_eom(uint64_t id, int hook, size_t datalen) { struct mfa_session *s; struct mfa_query *q; s = tree_xget(&sessions, id); q = mfa_query(s, QT_QUERY, hook); + q->u.datalen = datalen; mfa_drain_query(q); } void -mfa_filter_data(uint64_t id, const char *line) +mfa_filter(uint64_t id, int hook) { - mfa_run_data(TAILQ_FIRST(&chain.filters), id, line); + struct mfa_session *s; + struct mfa_query *q; + + s = tree_xget(&sessions, id); + q = mfa_query(s, QT_QUERY, hook); + + mfa_drain_query(q); } static void -mfa_run_data(struct mfa_filter *f, uint64_t id, const char *line) +mfa_set_fdout(struct mfa_session *s, int fdout) { struct mproc *p; - log_trace(TRACE_MFA, - "filter: running data for %016"PRIx64" on filter %p: %s", id, f, line); - - /* Send the dataline to the filters that want to see it. */ - while (f) { - if (f->hooks & HOOK_DATALINE) { - p = &f->mproc; - m_create(p, IMSG_FILTER_DATA, 0, 0, -1); - m_add_id(p, id); - m_add_string(p, line); + while(s->fcurr) { + if (s->fcurr->proc->hooks & HOOK_DATALINE) { + log_trace(TRACE_MFA, "mfa: sending fd %d to %s", fdout, mfa_filter_to_text(s->fcurr)); + p = &s->fcurr->proc->mproc; + m_create(p, IMSG_FILTER_PIPE_SETUP, 0, 0, fdout); + m_add_id(p, s->id); m_close(p); - - /* - * If this filter wants to alter data, we stop - * iterating here, and the filter becomes responsible - * for sending datalines back. - */ - if (f->flags & FILTER_ALTERDATA) { - log_trace(TRACE_MFA, - "filter: expect datalines from filter %s", - mfa_filter_to_text(f)); - return; - } + return; } - f = TAILQ_NEXT(f, entry); + s->fcurr = TAILQ_PREV(s->fcurr, mfa_filters, entry); } - /* When all filters are done, send the line back to the smtp process. */ - log_trace(TRACE_MFA, - "filter: sending final data to smtp for %016"PRIx64" on filter %p: %s", id, f, line); + log_trace(TRACE_MFA, "mfa: chain input is %d", fdout); - m_create(p_smtp, IMSG_MFA_SMTP_DATA, 0, 0, -1); - m_add_id(p_smtp, id); - m_add_string(p_smtp, line); + m_create(p_smtp, IMSG_QUEUE_MESSAGE_FILE, 0, 0, fdout); + m_add_id(p_smtp, s->id); + m_add_int(p_smtp, 1); m_close(p_smtp); + return; +} + +void +mfa_build_fd_chain(uint64_t id, int fdout) +{ + struct mfa_session *s; + + s = tree_xget(&sessions, id); + s->fcurr = TAILQ_LAST(s->filters, mfa_filters); + mfa_set_fdout(s, fdout); } static struct mfa_query * @@ -317,7 +420,7 @@ mfa_query(struct mfa_session *s, int type, int hook) TAILQ_INSERT_TAIL(&s->queries, q, entry); q->state = QUERY_READY; - q->current = TAILQ_FIRST(&chain.filters); + q->current = TAILQ_FIRST(s->filters); q->hasrun = 0; log_trace(TRACE_MFA, "filter: new query %s %s", type_to_str(type), @@ -329,7 +432,7 @@ mfa_query(struct mfa_session *s, int type, int hook) static void mfa_drain_query(struct mfa_query *q) { - struct mfa_filter *f; + struct mfa_filterproc *proc; struct mfa_query *prev; log_trace(TRACE_MFA, "filter: draining query %s", mfa_query_to_text(q)); @@ -377,7 +480,7 @@ mfa_drain_query(struct mfa_query *q) if (q->type == QT_QUERY) { log_trace(TRACE_MFA, - "filter: query 0x%016"PRIx64" done: " + "filter: query %016"PRIx64" done: " "status=%s code=%d response=\"%s\"", q->qid, status_to_str(q->smtp.status), @@ -385,11 +488,11 @@ mfa_drain_query(struct mfa_query *q) q->smtp.response); /* Done, notify all listeners and return smtp response */ - while (tree_poproot(&q->notify, NULL, (void**)&f)) { - m_create(&f->mproc, IMSG_FILTER_NOTIFY, 0, 0, -1); - m_add_id(&f->mproc, q->qid); - m_add_int(&f->mproc, q->smtp.status); - m_close(&f->mproc); + while (tree_poproot(&q->notify, NULL, (void**)&proc)) { + m_create(&proc->mproc, IMSG_FILTER_NOTIFY, 0, 0, -1); + m_add_id(&proc->mproc, q->qid); + m_add_int(&proc->mproc, q->smtp.status); + m_close(&proc->mproc); } m_create(p_smtp, IMSG_MFA_SMTP_RESPONSE, 0, 0, -1); @@ -405,8 +508,9 @@ mfa_drain_query(struct mfa_query *q) TAILQ_REMOVE(&q->session->queries, q, entry); /* If the query was a disconnect event, the session can be freed */ - if (q->type == HOOK_DISCONNECT) { + if (q->hook == HOOK_DISCONNECT) { /* XXX assert prev == NULL */ + log_trace(TRACE_MFA, "filter: freeing session %016" PRIx64, q->session->id); free(q->session); } @@ -417,7 +521,7 @@ mfa_drain_query(struct mfa_query *q) static void mfa_run_query(struct mfa_filter *f, struct mfa_query *q) { - if ((f->hooks & q->hook) == 0) { + if ((f->proc->hooks & q->hook) == 0) { log_trace(TRACE_MFA, "filter: skipping filter %s for query %s", mfa_filter_to_text(f), mfa_query_to_text(q)); return; @@ -427,100 +531,96 @@ mfa_run_query(struct mfa_filter *f, struct mfa_query *q) mfa_filter_to_text(f), mfa_query_to_text(q)); if (q->type == QT_QUERY) { - m_create(&f->mproc, IMSG_FILTER_QUERY, 0, 0, -1); - m_add_id(&f->mproc, q->session->id); - m_add_id(&f->mproc, q->qid); - m_add_int(&f->mproc, q->hook); + m_create(&f->proc->mproc, IMSG_FILTER_QUERY, 0, 0, -1); + m_add_id(&f->proc->mproc, q->session->id); + m_add_id(&f->proc->mproc, q->qid); + m_add_int(&f->proc->mproc, q->hook); switch (q->hook) { case HOOK_CONNECT: - m_add_sockaddr(&f->mproc, + m_add_sockaddr(&f->proc->mproc, (struct sockaddr *)&q->u.connect.local); - m_add_sockaddr(&f->mproc, + m_add_sockaddr(&f->proc->mproc, (struct sockaddr *)&q->u.connect.remote); - m_add_string(&f->mproc, q->u.connect.hostname); + m_add_string(&f->proc->mproc, q->u.connect.hostname); break; case HOOK_HELO: - m_add_string(&f->mproc, q->u.line); + m_add_string(&f->proc->mproc, q->u.line); break; case HOOK_MAIL: case HOOK_RCPT: - m_add_mailaddr(&f->mproc, &q->u.maddr); + m_add_mailaddr(&f->proc->mproc, &q->u.maddr); + break; + case HOOK_EOM: + m_add_u32(&f->proc->mproc, q->u.datalen); break; default: break; } - - m_close(&f->mproc); + m_close(&f->proc->mproc); tree_xset(&queries, q->qid, q); q->state = QUERY_RUNNING; } else { - m_create(&f->mproc, IMSG_FILTER_EVENT, 0, 0, -1); - m_add_id(&f->mproc, q->session->id); - m_add_int(&f->mproc, q->hook); - m_close(&f->mproc); + m_create(&f->proc->mproc, IMSG_FILTER_EVENT, 0, 0, -1); + m_add_id(&f->proc->mproc, q->session->id); + m_add_int(&f->proc->mproc, q->hook); + m_close(&f->proc->mproc); } } static void mfa_filter_imsg(struct mproc *p, struct imsg *imsg) { - struct mfa_filter *f; + struct mfa_filterproc *proc = p->data; + struct mfa_session *s; struct mfa_query *q, *next; struct msg m; const char *line; - uint64_t id, qid; - int status, code, notify; - - f = p->data; + uint64_t qid; + uint32_t datalen; + int qhook, status, code, notify; if (imsg == NULL) { - log_warnx("warn: filter \"%s\" closed unexpectedly", - p->name); + log_warnx("warn: filter \"%s\" closed unexpectedly", p->name); fatalx("exiting"); } - log_trace(TRACE_MFA, "filter: imsg %s from filter %s", + log_trace(TRACE_MFA, "filter: imsg %s from procfilter %s", filterimsg_to_str(imsg->hdr.type), - mfa_filter_to_text(f)); + mfa_filterproc_to_text(proc)); switch (imsg->hdr.type) { case IMSG_FILTER_REGISTER: - if (f->ready) { + if (proc->ready) { log_warnx("warn: filter \"%s\" already registered", - f->mproc.name); + proc->mproc.name); exit(1); } m_msg(&m, imsg); - m_get_int(&m, &f->hooks); - m_get_int(&m, &f->flags); + m_get_int(&m, &proc->hooks); + m_get_int(&m, &proc->flags); m_end(&m); - f->ready = 1; + proc->ready = 1; log_debug("debug: filter \"%s\": hooks 0x%08x flags 0x%04x", - f->mproc.name, f->hooks, f->flags); + proc->mproc.name, proc->hooks, proc->flags); - TAILQ_FOREACH(f, &chain.filters, entry) - if (!f->ready) + TAILQ_FOREACH(proc, &procs, entry) + if (!proc->ready) return; mfa_ready(); break; - case IMSG_FILTER_DATA: - m_msg(&m, imsg); - m_get_id(&m, &id); - m_get_string(&m, &line); - m_end(&m); - mfa_run_data(TAILQ_NEXT(f, entry), id, line); - break; - case IMSG_FILTER_RESPONSE: m_msg(&m, imsg); m_get_id(&m, &qid); + m_get_int(&m, &qhook); + if (qhook == HOOK_EOM) + m_get_u32(&m, &datalen); m_get_int(&m, &status); m_get_int(&m, &code); m_get_int(&m, ¬ify); @@ -528,10 +628,13 @@ mfa_filter_imsg(struct mproc *p, struct imsg *imsg) line = NULL; else m_get_string(&m, &line); - m_end(&m); q = tree_xpop(&queries, qid); + if (q->hook != qhook) { + log_warnx("warn: mfa: hook mismatch %d != %d", q->hook, qhook); + fatalx("exiting"); + } q->smtp.status = status; if (code) q->smtp.code = code; @@ -541,7 +644,9 @@ mfa_filter_imsg(struct mproc *p, struct imsg *imsg) } q->state = (status == FILTER_OK) ? QUERY_READY : QUERY_DONE; if (notify) - tree_xset(&q->notify, (uintptr_t)(f), f); + tree_xset(&q->notify, (uintptr_t)(proc), proc); + if (qhook == HOOK_EOM) + q->u.datalen = datalen; next = TAILQ_NEXT(q, entry); mfa_drain_query(q); @@ -554,8 +659,18 @@ mfa_filter_imsg(struct mproc *p, struct imsg *imsg) mfa_drain_query(next); break; + case IMSG_FILTER_PIPE_SETUP: + m_msg(&m, imsg); + m_get_id(&m, &qid); + m_end(&m); + + s = tree_xget(&sessions, qid); + s->fcurr = TAILQ_PREV(s->fcurr, mfa_filters, entry); + mfa_set_fdout(s, imsg->fd); + break; + default: - log_warnx("bad imsg from filter %s", p->name); + log_warnx("warn: bad imsg from filter %s", p->name); exit(1); } } @@ -606,12 +721,21 @@ mfa_filter_to_text(struct mfa_filter *f) { static char buf[1024]; - snprintf(buf, sizeof buf, "%s[hooks=0x%04x,flags=0x%x]", - f->mproc.name, f->hooks, f->flags); + snprintf(buf, sizeof buf, "filter:%s", mfa_filterproc_to_text(f->proc)); return (buf); } +static const char * +mfa_filterproc_to_text(struct mfa_filterproc *proc) +{ + static char buf[1024]; + + snprintf(buf, sizeof buf, "%s[hooks=0x%08x,flags=0x%04x]", + proc->mproc.name, proc->hooks, proc->flags); + + return (buf); +} #define CASE(x) case x : return #x @@ -622,8 +746,9 @@ filterimsg_to_str(int imsg) CASE(IMSG_FILTER_REGISTER); CASE(IMSG_FILTER_EVENT); CASE(IMSG_FILTER_QUERY); + CASE(IMSG_FILTER_PIPE_SETUP); + CASE(IMSG_FILTER_PIPE_ABORT); CASE(IMSG_FILTER_NOTIFY); - CASE(IMSG_FILTER_DATA); CASE(IMSG_FILTER_RESPONSE); default: return "IMSG_FILTER_???"; |