summaryrefslogtreecommitdiff
path: root/usr.sbin/smtpd/mfa_session.c
diff options
context:
space:
mode:
authorEric Faurot <eric@cvs.openbsd.org>2013-10-27 11:01:48 +0000
committerEric Faurot <eric@cvs.openbsd.org>2013-10-27 11:01:48 +0000
commit8ca395d4713666bd6faa4b4c546ef78d5c348fc5 (patch)
tree45347e2082c65c8a8c2861197569cfed599798b0 /usr.sbin/smtpd/mfa_session.c
parent615c989e409f084e205249da90531e8a9a2de063 (diff)
Make the filter infrastructure move forward.
This is a work-in-progress and it's not supposed to be useable for now.
Diffstat (limited to 'usr.sbin/smtpd/mfa_session.c')
-rw-r--r--usr.sbin/smtpd/mfa_session.c351
1 files changed, 238 insertions, 113 deletions
diff --git a/usr.sbin/smtpd/mfa_session.c b/usr.sbin/smtpd/mfa_session.c
index 2c212b6baba..312aedf650f 100644
--- a/usr.sbin/smtpd/mfa_session.c
+++ b/usr.sbin/smtpd/mfa_session.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: mfa_session.c,v 1.19 2013/10/26 12:27:59 eric Exp $ */
+/* $OpenBSD: mfa_session.c,v 1.20 2013/10/27 11:01:47 eric Exp $ */
/*
* Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
@@ -52,22 +52,27 @@ enum {
QUERY_DONE
};
-struct mfa_filter {
- TAILQ_ENTRY(mfa_filter) entry;
+
+struct mfa_filterproc {
+ TAILQ_ENTRY(mfa_filterproc) entry;
struct mproc mproc;
int hooks;
int flags;
int ready;
};
-struct mfa_filter_chain {
- TAILQ_HEAD(, mfa_filter) filters;
+struct mfa_filter {
+ TAILQ_ENTRY(mfa_filter) entry;
+ struct mfa_filterproc *proc;
};
+TAILQ_HEAD(mfa_filters, mfa_filter);
struct mfa_session {
- uint64_t id;
- int terminate;
- TAILQ_HEAD(mfa_queries, mfa_query) queries;
+ uint64_t id;
+ int terminate;
+ TAILQ_HEAD(mfa_queries, mfa_query) queries;
+ struct mfa_filters *filters;
+ struct mfa_filter *fcurr;
};
struct mfa_query {
@@ -91,6 +96,7 @@ struct mfa_query {
} connect;
char line[SMTPD_MAXLINESIZE];
struct mailaddr maddr;
+ size_t datalen;
} u;
/* current response */
@@ -105,11 +111,14 @@ static void mfa_filter_imsg(struct mproc *, struct imsg *);
static struct mfa_query *mfa_query(struct mfa_session *, int, int);
static void mfa_drain_query(struct mfa_query *);
static void mfa_run_query(struct mfa_filter *, struct mfa_query *);
-static void mfa_run_data(struct mfa_filter *, uint64_t, const char *);
-static struct mfa_filter_chain chain;
+static void mfa_set_fdout(struct mfa_session *, int);
+
+static TAILQ_HEAD(, mfa_filterproc) procs;
+struct dict chains;
static const char * mfa_query_to_text(struct mfa_query *);
static const char * mfa_filter_to_text(struct mfa_filter *);
+static const char * mfa_filterproc_to_text(struct mfa_filterproc *);
static const char * type_to_str(int);
static const char * hook_to_str(int);
static const char * status_to_str(int);
@@ -118,32 +127,125 @@ static const char * filterimsg_to_str(int);
struct tree sessions;
struct tree queries;
+
+static void
+mfa_extend_chain(struct mfa_filters *chain, const char *name)
+{
+ struct mfa_filter *n;
+ struct mfa_filters *fchain;
+ struct filter *fconf;
+ int i;
+
+ fconf = dict_xget(&env->sc_filters, name);
+ if (fconf->chain) {
+ log_debug("mfa: extending with \"%s\"", name);
+ for (i = 0; i < MAX_FILTER_PER_CHAIN; i++) {
+ if (!fconf->filters[i][0])
+ break;
+ mfa_extend_chain(chain, fconf->filters[i]);
+ }
+ }
+ else {
+ log_debug("mfa: adding filter \"%s\"", name);
+ n = xcalloc(1, sizeof(*n), "mfa_extend_chain");
+ fchain = dict_get(&chains, name);
+ n->proc = TAILQ_FIRST(fchain)->proc;
+ TAILQ_INSERT_TAIL(chain, n, entry);
+ }
+}
+
void
mfa_filter_prepare(void)
{
static int prepare = 0;
struct filter *filter;
void *iter;
+ struct mfa_filterproc *proc;
+ struct mfa_filters *fchain;
struct mfa_filter *f;
struct mproc *p;
+ int done, i;
if (prepare)
return;
prepare = 1;
- TAILQ_INIT(&chain.filters);
+ TAILQ_INIT(&procs);
+ dict_init(&chains);
+ log_debug("mfa: building simple chains...");
+
+ /* create all filter proc and associated chains */
iter = NULL;
while (dict_iter(&env->sc_filters, &iter, NULL, (void **)&filter)) {
- f = xcalloc(1, sizeof *f, "mfa_filter_init");
- p = &f->mproc;
+ if (filter->chain)
+ continue;
+
+ log_debug("mfa: building simple chain \"%s\"", filter->name);
+
+ proc = xcalloc(1, sizeof(*proc), "mfa_filter_init");
+ p = &proc->mproc;
p->handler = mfa_filter_imsg;
p->proc = PROC_FILTER;
p->name = xstrdup(filter->name, "mfa_filter_init");
- p->data = f;
+ p->data = proc;
if (mproc_fork(p, filter->path, filter->name) < 0)
fatalx("mfa_filter_init");
- TAILQ_INSERT_TAIL(&chain.filters, f, entry);
+
+ log_debug("mfa: registering proc \"%s\"", filter->name);
+
+ f = xcalloc(1, sizeof(*f), "mfa_filter_init");
+ f->proc = proc;
+
+ TAILQ_INSERT_TAIL(&procs, proc, entry);
+ fchain = xcalloc(1, sizeof(*fchain), "mfa_filter_prepare");
+ TAILQ_INIT(fchain);
+ TAILQ_INSERT_TAIL(fchain, f, entry);
+ dict_xset(&chains, filter->name, fchain);
+ filter->done = 1;
+ }
+
+ log_debug("mfa: building complex chains...");
+
+ /* resolve all chains */
+ done = 0;
+ while (!done) {
+ done = 1;
+ iter = NULL;
+ while (dict_iter(&env->sc_filters, &iter, NULL, (void **)&filter)) {
+ if (filter->done)
+ continue;
+ done = 0;
+ filter->done = 1;
+ for (i = 0; i < MAX_FILTER_PER_CHAIN; i++) {
+ if (!filter->filters[i][0])
+ break;
+ if (!dict_get(&chains, filter->filters[i])) {
+ filter->done = 0;
+ break;
+ }
+ }
+ if (filter->done == 0)
+ continue;
+ fchain = xcalloc(1, sizeof(*fchain), "mfa_filter_prepare");
+ TAILQ_INIT(fchain);
+ log_debug("mfa: building chain \"%s\"...", filter->name);
+ for (i = 0; i < MAX_FILTER_PER_CHAIN; i++) {
+ if (!filter->filters[i][0])
+ break;
+ mfa_extend_chain(fchain, filter->filters[i]);
+ }
+ log_debug("mfa: done building chain \"%s\"", filter->name);
+ dict_xset(&chains, filter->name, fchain);
+ }
+ }
+ log_debug("mfa: done building complex chains");
+
+ if (dict_get(&chains, "default") == NULL) {
+ log_debug("mfa: done building default chain");
+ fchain = xcalloc(1, sizeof(*fchain), "mfa_filter_prepare");
+ TAILQ_INIT(fchain);
+ dict_xset(&chains, "default", fchain);
}
}
@@ -151,8 +253,7 @@ void
mfa_filter_init(void)
{
static int init = 0;
- struct mfa_filter *f;
- struct mproc *p;
+ struct mfa_filterproc *p;
if (init)
return;
@@ -161,15 +262,15 @@ mfa_filter_init(void)
tree_init(&sessions);
tree_init(&queries);
- TAILQ_FOREACH(f, &chain.filters, entry) {
- p = &f->mproc;
- m_create(p, IMSG_FILTER_REGISTER, 0, 0, -1);
- m_add_u32(p, FILTER_API_VERSION);
- m_close(p);
- mproc_enable(p);
+ TAILQ_FOREACH(p, &procs, entry) {
+ m_create(&p->mproc, IMSG_FILTER_REGISTER, 0, 0, -1);
+ m_add_u32(&p->mproc, FILTER_API_VERSION);
+ m_add_string(&p->mproc, p->mproc.name);
+ m_close(&p->mproc);
+ mproc_enable(&p->mproc);
}
- if (TAILQ_FIRST(&chain.filters) == NULL)
+ if (TAILQ_FIRST(&procs) == NULL)
mfa_ready();
}
@@ -182,6 +283,7 @@ mfa_filter_connect(uint64_t id, const struct sockaddr *local,
s = xcalloc(1, sizeof(*s), "mfa_query_connect");
s->id = id;
+ s->filters = dict_xget(&chains, "default");
TAILQ_INIT(&s->queries);
tree_xset(&sessions, s->id, s);
@@ -244,63 +346,64 @@ mfa_filter_line(uint64_t id, int hook, const char *line)
}
void
-mfa_filter(uint64_t id, int hook)
+mfa_filter_eom(uint64_t id, int hook, size_t datalen)
{
struct mfa_session *s;
struct mfa_query *q;
s = tree_xget(&sessions, id);
q = mfa_query(s, QT_QUERY, hook);
+ q->u.datalen = datalen;
mfa_drain_query(q);
}
void
-mfa_filter_data(uint64_t id, const char *line)
+mfa_filter(uint64_t id, int hook)
{
- mfa_run_data(TAILQ_FIRST(&chain.filters), id, line);
+ struct mfa_session *s;
+ struct mfa_query *q;
+
+ s = tree_xget(&sessions, id);
+ q = mfa_query(s, QT_QUERY, hook);
+
+ mfa_drain_query(q);
}
static void
-mfa_run_data(struct mfa_filter *f, uint64_t id, const char *line)
+mfa_set_fdout(struct mfa_session *s, int fdout)
{
struct mproc *p;
- log_trace(TRACE_MFA,
- "filter: running data for %016"PRIx64" on filter %p: %s", id, f, line);
-
- /* Send the dataline to the filters that want to see it. */
- while (f) {
- if (f->hooks & HOOK_DATALINE) {
- p = &f->mproc;
- m_create(p, IMSG_FILTER_DATA, 0, 0, -1);
- m_add_id(p, id);
- m_add_string(p, line);
+ while(s->fcurr) {
+ if (s->fcurr->proc->hooks & HOOK_DATALINE) {
+ log_trace(TRACE_MFA, "mfa: sending fd %d to %s", fdout, mfa_filter_to_text(s->fcurr));
+ p = &s->fcurr->proc->mproc;
+ m_create(p, IMSG_FILTER_PIPE_SETUP, 0, 0, fdout);
+ m_add_id(p, s->id);
m_close(p);
-
- /*
- * If this filter wants to alter data, we stop
- * iterating here, and the filter becomes responsible
- * for sending datalines back.
- */
- if (f->flags & FILTER_ALTERDATA) {
- log_trace(TRACE_MFA,
- "filter: expect datalines from filter %s",
- mfa_filter_to_text(f));
- return;
- }
+ return;
}
- f = TAILQ_NEXT(f, entry);
+ s->fcurr = TAILQ_PREV(s->fcurr, mfa_filters, entry);
}
- /* When all filters are done, send the line back to the smtp process. */
- log_trace(TRACE_MFA,
- "filter: sending final data to smtp for %016"PRIx64" on filter %p: %s", id, f, line);
+ log_trace(TRACE_MFA, "mfa: chain input is %d", fdout);
- m_create(p_smtp, IMSG_MFA_SMTP_DATA, 0, 0, -1);
- m_add_id(p_smtp, id);
- m_add_string(p_smtp, line);
+ m_create(p_smtp, IMSG_QUEUE_MESSAGE_FILE, 0, 0, fdout);
+ m_add_id(p_smtp, s->id);
+ m_add_int(p_smtp, 1);
m_close(p_smtp);
+ return;
+}
+
+void
+mfa_build_fd_chain(uint64_t id, int fdout)
+{
+ struct mfa_session *s;
+
+ s = tree_xget(&sessions, id);
+ s->fcurr = TAILQ_LAST(s->filters, mfa_filters);
+ mfa_set_fdout(s, fdout);
}
static struct mfa_query *
@@ -317,7 +420,7 @@ mfa_query(struct mfa_session *s, int type, int hook)
TAILQ_INSERT_TAIL(&s->queries, q, entry);
q->state = QUERY_READY;
- q->current = TAILQ_FIRST(&chain.filters);
+ q->current = TAILQ_FIRST(s->filters);
q->hasrun = 0;
log_trace(TRACE_MFA, "filter: new query %s %s", type_to_str(type),
@@ -329,7 +432,7 @@ mfa_query(struct mfa_session *s, int type, int hook)
static void
mfa_drain_query(struct mfa_query *q)
{
- struct mfa_filter *f;
+ struct mfa_filterproc *proc;
struct mfa_query *prev;
log_trace(TRACE_MFA, "filter: draining query %s", mfa_query_to_text(q));
@@ -377,7 +480,7 @@ mfa_drain_query(struct mfa_query *q)
if (q->type == QT_QUERY) {
log_trace(TRACE_MFA,
- "filter: query 0x%016"PRIx64" done: "
+ "filter: query %016"PRIx64" done: "
"status=%s code=%d response=\"%s\"",
q->qid,
status_to_str(q->smtp.status),
@@ -385,11 +488,11 @@ mfa_drain_query(struct mfa_query *q)
q->smtp.response);
/* Done, notify all listeners and return smtp response */
- while (tree_poproot(&q->notify, NULL, (void**)&f)) {
- m_create(&f->mproc, IMSG_FILTER_NOTIFY, 0, 0, -1);
- m_add_id(&f->mproc, q->qid);
- m_add_int(&f->mproc, q->smtp.status);
- m_close(&f->mproc);
+ while (tree_poproot(&q->notify, NULL, (void**)&proc)) {
+ m_create(&proc->mproc, IMSG_FILTER_NOTIFY, 0, 0, -1);
+ m_add_id(&proc->mproc, q->qid);
+ m_add_int(&proc->mproc, q->smtp.status);
+ m_close(&proc->mproc);
}
m_create(p_smtp, IMSG_MFA_SMTP_RESPONSE, 0, 0, -1);
@@ -405,8 +508,9 @@ mfa_drain_query(struct mfa_query *q)
TAILQ_REMOVE(&q->session->queries, q, entry);
/* If the query was a disconnect event, the session can be freed */
- if (q->type == HOOK_DISCONNECT) {
+ if (q->hook == HOOK_DISCONNECT) {
/* XXX assert prev == NULL */
+ log_trace(TRACE_MFA, "filter: freeing session %016" PRIx64, q->session->id);
free(q->session);
}
@@ -417,7 +521,7 @@ mfa_drain_query(struct mfa_query *q)
static void
mfa_run_query(struct mfa_filter *f, struct mfa_query *q)
{
- if ((f->hooks & q->hook) == 0) {
+ if ((f->proc->hooks & q->hook) == 0) {
log_trace(TRACE_MFA, "filter: skipping filter %s for query %s",
mfa_filter_to_text(f), mfa_query_to_text(q));
return;
@@ -427,100 +531,96 @@ mfa_run_query(struct mfa_filter *f, struct mfa_query *q)
mfa_filter_to_text(f), mfa_query_to_text(q));
if (q->type == QT_QUERY) {
- m_create(&f->mproc, IMSG_FILTER_QUERY, 0, 0, -1);
- m_add_id(&f->mproc, q->session->id);
- m_add_id(&f->mproc, q->qid);
- m_add_int(&f->mproc, q->hook);
+ m_create(&f->proc->mproc, IMSG_FILTER_QUERY, 0, 0, -1);
+ m_add_id(&f->proc->mproc, q->session->id);
+ m_add_id(&f->proc->mproc, q->qid);
+ m_add_int(&f->proc->mproc, q->hook);
switch (q->hook) {
case HOOK_CONNECT:
- m_add_sockaddr(&f->mproc,
+ m_add_sockaddr(&f->proc->mproc,
(struct sockaddr *)&q->u.connect.local);
- m_add_sockaddr(&f->mproc,
+ m_add_sockaddr(&f->proc->mproc,
(struct sockaddr *)&q->u.connect.remote);
- m_add_string(&f->mproc, q->u.connect.hostname);
+ m_add_string(&f->proc->mproc, q->u.connect.hostname);
break;
case HOOK_HELO:
- m_add_string(&f->mproc, q->u.line);
+ m_add_string(&f->proc->mproc, q->u.line);
break;
case HOOK_MAIL:
case HOOK_RCPT:
- m_add_mailaddr(&f->mproc, &q->u.maddr);
+ m_add_mailaddr(&f->proc->mproc, &q->u.maddr);
+ break;
+ case HOOK_EOM:
+ m_add_u32(&f->proc->mproc, q->u.datalen);
break;
default:
break;
}
-
- m_close(&f->mproc);
+ m_close(&f->proc->mproc);
tree_xset(&queries, q->qid, q);
q->state = QUERY_RUNNING;
}
else {
- m_create(&f->mproc, IMSG_FILTER_EVENT, 0, 0, -1);
- m_add_id(&f->mproc, q->session->id);
- m_add_int(&f->mproc, q->hook);
- m_close(&f->mproc);
+ m_create(&f->proc->mproc, IMSG_FILTER_EVENT, 0, 0, -1);
+ m_add_id(&f->proc->mproc, q->session->id);
+ m_add_int(&f->proc->mproc, q->hook);
+ m_close(&f->proc->mproc);
}
}
static void
mfa_filter_imsg(struct mproc *p, struct imsg *imsg)
{
- struct mfa_filter *f;
+ struct mfa_filterproc *proc = p->data;
+ struct mfa_session *s;
struct mfa_query *q, *next;
struct msg m;
const char *line;
- uint64_t id, qid;
- int status, code, notify;
-
- f = p->data;
+ uint64_t qid;
+ uint32_t datalen;
+ int qhook, status, code, notify;
if (imsg == NULL) {
- log_warnx("warn: filter \"%s\" closed unexpectedly",
- p->name);
+ log_warnx("warn: filter \"%s\" closed unexpectedly", p->name);
fatalx("exiting");
}
- log_trace(TRACE_MFA, "filter: imsg %s from filter %s",
+ log_trace(TRACE_MFA, "filter: imsg %s from procfilter %s",
filterimsg_to_str(imsg->hdr.type),
- mfa_filter_to_text(f));
+ mfa_filterproc_to_text(proc));
switch (imsg->hdr.type) {
case IMSG_FILTER_REGISTER:
- if (f->ready) {
+ if (proc->ready) {
log_warnx("warn: filter \"%s\" already registered",
- f->mproc.name);
+ proc->mproc.name);
exit(1);
}
m_msg(&m, imsg);
- m_get_int(&m, &f->hooks);
- m_get_int(&m, &f->flags);
+ m_get_int(&m, &proc->hooks);
+ m_get_int(&m, &proc->flags);
m_end(&m);
- f->ready = 1;
+ proc->ready = 1;
log_debug("debug: filter \"%s\": hooks 0x%08x flags 0x%04x",
- f->mproc.name, f->hooks, f->flags);
+ proc->mproc.name, proc->hooks, proc->flags);
- TAILQ_FOREACH(f, &chain.filters, entry)
- if (!f->ready)
+ TAILQ_FOREACH(proc, &procs, entry)
+ if (!proc->ready)
return;
mfa_ready();
break;
- case IMSG_FILTER_DATA:
- m_msg(&m, imsg);
- m_get_id(&m, &id);
- m_get_string(&m, &line);
- m_end(&m);
- mfa_run_data(TAILQ_NEXT(f, entry), id, line);
- break;
-
case IMSG_FILTER_RESPONSE:
m_msg(&m, imsg);
m_get_id(&m, &qid);
+ m_get_int(&m, &qhook);
+ if (qhook == HOOK_EOM)
+ m_get_u32(&m, &datalen);
m_get_int(&m, &status);
m_get_int(&m, &code);
m_get_int(&m, &notify);
@@ -528,10 +628,13 @@ mfa_filter_imsg(struct mproc *p, struct imsg *imsg)
line = NULL;
else
m_get_string(&m, &line);
-
m_end(&m);
q = tree_xpop(&queries, qid);
+ if (q->hook != qhook) {
+ log_warnx("warn: mfa: hook mismatch %d != %d", q->hook, qhook);
+ fatalx("exiting");
+ }
q->smtp.status = status;
if (code)
q->smtp.code = code;
@@ -541,7 +644,9 @@ mfa_filter_imsg(struct mproc *p, struct imsg *imsg)
}
q->state = (status == FILTER_OK) ? QUERY_READY : QUERY_DONE;
if (notify)
- tree_xset(&q->notify, (uintptr_t)(f), f);
+ tree_xset(&q->notify, (uintptr_t)(proc), proc);
+ if (qhook == HOOK_EOM)
+ q->u.datalen = datalen;
next = TAILQ_NEXT(q, entry);
mfa_drain_query(q);
@@ -554,8 +659,18 @@ mfa_filter_imsg(struct mproc *p, struct imsg *imsg)
mfa_drain_query(next);
break;
+ case IMSG_FILTER_PIPE_SETUP:
+ m_msg(&m, imsg);
+ m_get_id(&m, &qid);
+ m_end(&m);
+
+ s = tree_xget(&sessions, qid);
+ s->fcurr = TAILQ_PREV(s->fcurr, mfa_filters, entry);
+ mfa_set_fdout(s, imsg->fd);
+ break;
+
default:
- log_warnx("bad imsg from filter %s", p->name);
+ log_warnx("warn: bad imsg from filter %s", p->name);
exit(1);
}
}
@@ -606,12 +721,21 @@ mfa_filter_to_text(struct mfa_filter *f)
{
static char buf[1024];
- snprintf(buf, sizeof buf, "%s[hooks=0x%04x,flags=0x%x]",
- f->mproc.name, f->hooks, f->flags);
+ snprintf(buf, sizeof buf, "filter:%s", mfa_filterproc_to_text(f->proc));
return (buf);
}
+static const char *
+mfa_filterproc_to_text(struct mfa_filterproc *proc)
+{
+ static char buf[1024];
+
+ snprintf(buf, sizeof buf, "%s[hooks=0x%08x,flags=0x%04x]",
+ proc->mproc.name, proc->hooks, proc->flags);
+
+ return (buf);
+}
#define CASE(x) case x : return #x
@@ -622,8 +746,9 @@ filterimsg_to_str(int imsg)
CASE(IMSG_FILTER_REGISTER);
CASE(IMSG_FILTER_EVENT);
CASE(IMSG_FILTER_QUERY);
+ CASE(IMSG_FILTER_PIPE_SETUP);
+ CASE(IMSG_FILTER_PIPE_ABORT);
CASE(IMSG_FILTER_NOTIFY);
- CASE(IMSG_FILTER_DATA);
CASE(IMSG_FILTER_RESPONSE);
default:
return "IMSG_FILTER_???";