/* $OpenBSD: filter.c,v 1.25 2017/01/09 09:53:23 reyk Exp $ */ /* * Copyright (c) 2011 Gilles Chehade * Copyright (c) 2012 Eric Faurot * * Permission to use, copy, modify, and distribute this software for any * purpose with or without fee is hereby granted, provided that the above * copyright notice and this permission notice appear in all copies. * * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "smtpd.h" #include "log.h" enum { QUERY_READY, QUERY_RUNNING, QUERY_DONE }; struct filter_proc { TAILQ_ENTRY(filter_proc) entry; struct mproc mproc; int hooks; int flags; int ready; }; struct filter { TAILQ_ENTRY(filter) entry; struct filter_proc *proc; }; TAILQ_HEAD(filter_lst, filter); TAILQ_HEAD(filter_query_lst, filter_query); struct filter_session { uint64_t id; int terminate; struct filter_lst *filters; struct filter *fcurr; int error; struct io *iev; size_t idatalen; FILE *ofile; struct filter_query *eom; }; struct filter_query { uint64_t qid; int type; struct filter_session *session; int state; struct filter *current; /* current data */ union { struct { struct sockaddr_storage local; struct sockaddr_storage remote; char hostname[HOST_NAME_MAX+1]; } connect; char line[LINE_MAX]; struct mailaddr maddr; size_t datalen; } u; /* current response */ struct { int status; int code; char *response; } smtp; }; static void filter_imsg(struct mproc *, struct imsg *); static void filter_post_event(uint64_t, int, struct filter *, struct filter *); static struct filter_query *filter_query(struct filter_session *, int); static void filter_drain_query(struct filter_query *); static void filter_run_query(struct filter *, struct filter_query *); static void filter_end_query(struct filter_query *); static void filter_set_sink(struct filter_session *, int); static int filter_tx(struct filter_session *, int); static void filter_tx_io(struct io *, int, void *); static TAILQ_HEAD(, filter_proc) procs; struct dict chains; static const char * filter_session_to_text(struct filter_session *); static const char * filter_query_to_text(struct filter_query *); static const char * filter_to_text(struct filter *); static const char * filter_proc_to_text(struct filter_proc *); static const char * query_to_str(int); static const char * event_to_str(int); static const char * status_to_str(int); static const char * filterimsg_to_str(int); struct tree sessions; struct tree queries; static void filter_add_arg(struct filter_conf *filter, char *arg) { if (filter->argc == MAX_FILTER_ARGS) { log_warnx("warn: filter \"%s\" is full", filter->name); fatalx("exiting"); } filter->argv[filter->argc++] = arg; } static void filter_extend_chain(struct filter_lst *chain, const char *name) { struct filter *n; struct filter_lst *fchain; struct filter_conf *fconf; int i; fconf = dict_xget(&env->sc_filters, name); if (fconf->chain) { log_debug("filter: extending with \"%s\"", name); for (i = 0; i < fconf->argc; i++) filter_extend_chain(chain, fconf->argv[i]); } else { log_debug("filter: adding filter \"%s\"", name); n = xcalloc(1, sizeof(*n), "filter_extend_chain"); fchain = dict_get(&chains, name); n->proc = TAILQ_FIRST(fchain)->proc; TAILQ_INSERT_TAIL(chain, n, entry); } } void filter_postfork(void) { static int prepare = 0; struct filter_conf *filter; void *iter; struct filter_proc *proc; struct filter_lst *fchain; struct filter *f; struct mproc *p; int done, i; if (prepare) return; prepare = 1; TAILQ_INIT(&procs); dict_init(&chains); log_debug("filter: building simple chains..."); /* create all filter proc and associated chains */ iter = NULL; while (dict_iter(&env->sc_filters, &iter, NULL, (void **)&filter)) { if (filter->chain) continue; log_debug("filter: building simple chain \"%s\"", filter->name); proc = xcalloc(1, sizeof(*proc), "filter_postfork"); p = &proc->mproc; p->handler = filter_imsg; p->proc = PROC_FILTER; p->name = xstrdup(filter->name, "filter_postfork"); p->data = proc; if (tracing & TRACE_DEBUG) filter_add_arg(filter, "-v"); if (foreground_log) filter_add_arg(filter, "-d"); if (mproc_fork(p, filter->path, filter->argv) < 0) fatalx("filter_postfork"); log_debug("filter: registering proc \"%s\"", filter->name); f = xcalloc(1, sizeof(*f), "filter_postfork"); f->proc = proc; TAILQ_INSERT_TAIL(&procs, proc, entry); fchain = xcalloc(1, sizeof(*fchain), "filter_postfork"); TAILQ_INIT(fchain); TAILQ_INSERT_TAIL(fchain, f, entry); dict_xset(&chains, filter->name, fchain); filter->done = 1; } log_debug("filter: building complex chains..."); /* resolve all chains */ done = 0; while (!done) { done = 1; iter = NULL; while (dict_iter(&env->sc_filters, &iter, NULL, (void **)&filter)) { if (filter->done) continue; done = 0; filter->done = 1; for (i = 0; i < filter->argc; i++) { if (!dict_get(&chains, filter->argv[i])) { filter->done = 0; break; } } if (filter->done == 0) continue; fchain = xcalloc(1, sizeof(*fchain), "filter_postfork"); TAILQ_INIT(fchain); log_debug("filter: building chain \"%s\"...", filter->name); for (i = 0; i < filter->argc; i++) filter_extend_chain(fchain, filter->argv[i]); log_debug("filter: done building chain \"%s\"", filter->name); dict_xset(&chains, filter->name, fchain); } } log_debug("filter: done building complex chains"); fchain = xcalloc(1, sizeof(*fchain), "filter_postfork"); TAILQ_INIT(fchain); dict_xset(&chains, "", fchain); } void filter_configure(void) { static int init = 0; struct filter_proc *p; if (init) return; init = 1; tree_init(&sessions); tree_init(&queries); TAILQ_FOREACH(p, &procs, entry) { m_create(&p->mproc, IMSG_FILTER_REGISTER, 0, 0, -1); m_add_u32(&p->mproc, FILTER_API_VERSION); m_add_string(&p->mproc, p->mproc.name); m_close(&p->mproc); mproc_enable(&p->mproc); } if (TAILQ_FIRST(&procs) == NULL) smtp_configure(); } void filter_event(uint64_t id, int event) { struct filter_session *s; if (event == EVENT_DISCONNECT) /* On disconnect, the session is virtualy dead */ s = tree_xpop(&sessions, id); else s = tree_xget(&sessions, id); filter_post_event(id, event, TAILQ_FIRST(s->filters), NULL); if (event == EVENT_DISCONNECT) { if (s->iev) io_free(s->iev); if (s->ofile) fclose(s->ofile); free(s); } } void filter_connect(uint64_t id, const struct sockaddr *local, const struct sockaddr *remote, const char *host, const char *filter) { struct filter_session *s; struct filter_query *q; s = xcalloc(1, sizeof(*s), "filter_event"); s->id = id; if (filter == NULL) filter = ""; s->filters = dict_xget(&chains, filter); tree_xset(&sessions, s->id, s); filter_event(id, EVENT_CONNECT); q = filter_query(s, QUERY_CONNECT); memmove(&q->u.connect.local, local, local->sa_len); memmove(&q->u.connect.remote, remote, remote->sa_len); strlcpy(q->u.connect.hostname, host, sizeof(q->u.connect.hostname)); q->smtp.status = FILTER_OK; q->smtp.code = 0; q->smtp.response = NULL; filter_drain_query(q); } void filter_mailaddr(uint64_t id, int type, const struct mailaddr *maddr) { struct filter_session *s; struct filter_query *q; s = tree_xget(&sessions, id); q = filter_query(s, type); strlcpy(q->u.maddr.user, maddr->user, sizeof(q->u.maddr.user)); strlcpy(q->u.maddr.domain, maddr->domain, sizeof(q->u.maddr.domain)); filter_drain_query(q); } void filter_line(uint64_t id, int type, const char *line) { struct filter_session *s; struct filter_query *q; s = tree_xget(&sessions, id); q = filter_query(s, type); if (line) strlcpy(q->u.line, line, sizeof(q->u.line)); filter_drain_query(q); } void filter_eom(uint64_t id, int type, size_t datalen) { struct filter_session *s; struct filter_query *q; s = tree_xget(&sessions, id); q = filter_query(s, type); q->u.datalen = datalen; filter_drain_query(q); } static void filter_set_sink(struct filter_session *s, int sink) { struct mproc *p; while (s->fcurr) { if (s->fcurr->proc->hooks & HOOK_DATALINE) { log_trace(TRACE_FILTERS, "filter: sending fd %d to %s", sink, filter_to_text(s->fcurr)); p = &s->fcurr->proc->mproc; m_create(p, IMSG_FILTER_PIPE, 0, 0, sink); m_add_id(p, s->id); m_close(p); return; } s->fcurr = TAILQ_PREV(s->fcurr, filter_lst, entry); } log_trace(TRACE_FILTERS, "filter: chain input is %d", sink); smtp_filter_fd(s->id, sink); } void filter_build_fd_chain(uint64_t id, int sink) { struct filter_session *s; int fd; s = tree_xget(&sessions, id); s->fcurr = TAILQ_LAST(s->filters, filter_lst); fd = filter_tx(s, sink); filter_set_sink(s, fd); } void filter_post_event(uint64_t id, int event, struct filter *f, struct filter *end) { for(; f && f != end; f = TAILQ_NEXT(f, entry)) { log_trace(TRACE_FILTERS, "filter: post-event event=%s filter=%s", event_to_str(event), f->proc->mproc.name); m_create(&f->proc->mproc, IMSG_FILTER_EVENT, 0, 0, -1); m_add_id(&f->proc->mproc, id); m_add_int(&f->proc->mproc, event); m_close(&f->proc->mproc); } } static struct filter_query * filter_query(struct filter_session *s, int type) { struct filter_query *q; q = xcalloc(1, sizeof(*q), "filter_query"); q->qid = generate_uid(); q->session = s; q->type = type; q->state = QUERY_READY; q->current = TAILQ_FIRST(s->filters); log_trace(TRACE_FILTERS, "filter: new query %s", query_to_str(type)); return (q); } static void filter_drain_query(struct filter_query *q) { log_trace(TRACE_FILTERS, "filter: filter_drain_query %s", filter_query_to_text(q)); /* * The query must be passed through all filters that registered * a hook, until one rejects it. */ while (q->state != QUERY_DONE) { /* Walk over all filters */ while (q->current) { filter_run_query(q->current, q); if (q->state == QUERY_RUNNING) { log_trace(TRACE_FILTERS, "filter: waiting for running query %s", filter_query_to_text(q)); return; } } q->state = QUERY_DONE; } /* Defer the response if the file is not closed yet. */ if (q->type == QUERY_EOM && q->session->ofile && q->smtp.status == FILTER_OK) { log_debug("filter: deferring eom query..."); q->session->eom = q; return; } filter_end_query(q); } static void filter_run_query(struct filter *f, struct filter_query *q) { log_trace(TRACE_FILTERS, "filter: running filter %s for query %s", filter_to_text(f), filter_query_to_text(q)); m_create(&f->proc->mproc, IMSG_FILTER_QUERY, 0, 0, -1); m_add_id(&f->proc->mproc, q->session->id); m_add_id(&f->proc->mproc, q->qid); m_add_int(&f->proc->mproc, q->type); switch (q->type) { case QUERY_CONNECT: m_add_sockaddr(&f->proc->mproc, (struct sockaddr *)&q->u.connect.local); m_add_sockaddr(&f->proc->mproc, (struct sockaddr *)&q->u.connect.remote); m_add_string(&f->proc->mproc, q->u.connect.hostname); break; case QUERY_HELO: m_add_string(&f->proc->mproc, q->u.line); break; case QUERY_MAIL: case QUERY_RCPT: m_add_mailaddr(&f->proc->mproc, &q->u.maddr); break; case QUERY_EOM: m_add_u32(&f->proc->mproc, q->u.datalen); break; default: break; } m_close(&f->proc->mproc); tree_xset(&queries, q->qid, q); q->state = QUERY_RUNNING; } static void filter_end_query(struct filter_query *q) { struct filter_session *s = q->session; const char *response = q->smtp.response; log_trace(TRACE_FILTERS, "filter: filter_end_query %s", filter_query_to_text(q)); if (q->type == QUERY_EOM && q->smtp.status == FILTER_OK) { if (s->error || q->u.datalen != s->idatalen) { response = "Internal error"; q->smtp.code = 451; q->smtp.status = FILTER_FAIL; if (!s->error) log_warnx("filter: datalen mismatch on session %" PRIx64 ": %zu/%zu", s->id, s->idatalen, q->u.datalen); } } log_trace(TRACE_FILTERS, "filter: query %016"PRIx64" done: " "status=%s code=%d response=\"%s\"", q->qid, status_to_str(q->smtp.status), q->smtp.code, response); smtp_filter_response(s->id, q->type, q->smtp.status, q->smtp.code, response); free(q->smtp.response); free(q); } static void filter_imsg(struct mproc *p, struct imsg *imsg) { struct filter_proc *proc = p->data; struct filter_session *s; struct filter_query *q; struct msg m; const char *line; uint64_t qid; uint32_t datalen; int type, status, code; if (imsg == NULL) { log_warnx("warn: filter \"%s\" closed unexpectedly", p->name); fatalx("exiting"); } log_trace(TRACE_FILTERS, "filter: imsg %s from procfilter %s", filterimsg_to_str(imsg->hdr.type), filter_proc_to_text(proc)); switch (imsg->hdr.type) { case IMSG_FILTER_REGISTER: if (proc->ready) { log_warnx("warn: filter \"%s\" already registered", proc->mproc.name); exit(1); } m_msg(&m, imsg); m_get_int(&m, &proc->hooks); m_get_int(&m, &proc->flags); m_end(&m); proc->ready = 1; log_debug("debug: filter \"%s\": hooks 0x%08x flags 0x%04x", proc->mproc.name, proc->hooks, proc->flags); TAILQ_FOREACH(proc, &procs, entry) if (!proc->ready) return; smtp_configure(); break; case IMSG_FILTER_RESPONSE: m_msg(&m, imsg); m_get_id(&m, &qid); m_get_int(&m, &type); if (type == QUERY_EOM) m_get_u32(&m, &datalen); m_get_int(&m, &status); m_get_int(&m, &code); if (m_is_eom(&m)) line = NULL; else m_get_string(&m, &line); m_end(&m); q = tree_xpop(&queries, qid); if (q->type != type) { log_warnx("warn: filter: type mismatch %d != %d", q->type, type); fatalx("exiting"); } q->smtp.status = status; if (code) q->smtp.code = code; if (line) { free(q->smtp.response); q->smtp.response = xstrdup(line, "filter_imsg"); } q->state = (status == FILTER_OK) ? QUERY_READY : QUERY_DONE; if (type == QUERY_EOM) q->u.datalen = datalen; q->current = TAILQ_NEXT(q->current, entry); filter_drain_query(q); break; case IMSG_FILTER_PIPE: m_msg(&m, imsg); m_get_id(&m, &qid); m_end(&m); s = tree_xget(&sessions, qid); s->fcurr = TAILQ_PREV(s->fcurr, filter_lst, entry); filter_set_sink(s, imsg->fd); break; default: log_warnx("warn: bad imsg from filter %s", p->name); exit(1); } } static int filter_tx(struct filter_session *s, int sink) { int sp[2]; s->idatalen = 0; s->eom = NULL; s->error = 0; if (socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC, sp) == -1) { log_warn("warn: filter: socketpair"); return (-1); } if ((s->ofile = fdopen(sink, "w")) == NULL) { log_warn("warn: filter: fdopen"); close(sp[0]); close(sp[1]); return (-1); } io_set_nonblocking(sp[0]); io_set_nonblocking(sp[1]); s->iev = io_new(); io_set_callback(s->iev, filter_tx_io, s); io_set_fd(s->iev, sp[0]); io_set_read(s->iev); return (sp[1]); } static void filter_tx_io(struct io *io, int evt, void *arg) { struct filter_session *s = arg; size_t len, n; char *data; log_trace(TRACE_FILTERS, "filter: filter_tx_io(%p, %s)", s, io_strevent(evt)); switch (evt) { case IO_DATAIN: data = io_data(s->iev); len = io_datalen(s->iev); log_trace(TRACE_FILTERS, "filter: filter_tx_io: datain (%zu) for req %016"PRIx64"", len, s->id); n = fwrite(data, 1, len, s->ofile); if (n != len) { log_warnx("warn: filter_tx_io: fwrite %zu/%zu", n, len); s->error = 1; break; } s->idatalen += n; io_drop(s->iev, n); return; case IO_DISCONNECTED: log_trace(TRACE_FILTERS, "debug: filter: tx done (%zu) for req %016"PRIx64, s->idatalen, s->id); break; default: log_warn("warn: filter_tx_io: bad evt (%d) for req %016"PRIx64, evt, s->id); s->error = 1; break; } io_free(s->iev); s->iev = NULL; fclose(s->ofile); s->ofile = NULL; /* deferred eom request */ if (s->eom) { log_debug("filter: running eom query..."); filter_end_query(s->eom); } else { log_debug("filter: eom not received yet"); } } static const char * filter_query_to_text(struct filter_query *q) { static char buf[1024]; char tmp[1024]; tmp[0] = '\0'; switch (q->type) { case QUERY_CONNECT: strlcat(tmp, "=", sizeof tmp); strlcat(tmp, ss_to_text(&q->u.connect.local), sizeof tmp); strlcat(tmp, " <-> ", sizeof tmp); strlcat(tmp, ss_to_text(&q->u.connect.remote), sizeof tmp); strlcat(tmp, "(", sizeof tmp); strlcat(tmp, q->u.connect.hostname, sizeof tmp); strlcat(tmp, ")", sizeof tmp); break; case QUERY_MAIL: case QUERY_RCPT: snprintf(tmp, sizeof tmp, "=%s@%s", q->u.maddr.user, q->u.maddr.domain); break; case QUERY_HELO: snprintf(tmp, sizeof tmp, "=%s", q->u.line); break; default: break; } snprintf(buf, sizeof buf, "%016"PRIx64"[%s%s,%s]", q->qid, query_to_str(q->type), tmp, filter_session_to_text(q->session)); return (buf); } static const char * filter_session_to_text(struct filter_session *s) { static char buf[1024]; if (s == NULL) return "filter_session@NULL"; snprintf(buf, sizeof(buf), "filter_session@%p[datalen=%zu,eom=%p,ofile=%p]", s, s->idatalen, s->eom, s->ofile); return buf; } static const char * filter_to_text(struct filter *f) { static char buf[1024]; snprintf(buf, sizeof buf, "filter:%s", filter_proc_to_text(f->proc)); return (buf); } static const char * filter_proc_to_text(struct filter_proc *proc) { static char buf[1024]; snprintf(buf, sizeof buf, "%s[hooks=0x%08x,flags=0x%04x]", proc->mproc.name, proc->hooks, proc->flags); return (buf); } #define CASE(x) case x : return #x static const char * filterimsg_to_str(int imsg) { switch (imsg) { CASE(IMSG_FILTER_REGISTER); CASE(IMSG_FILTER_EVENT); CASE(IMSG_FILTER_QUERY); CASE(IMSG_FILTER_PIPE); CASE(IMSG_FILTER_RESPONSE); default: return "IMSG_FILTER_???"; } } static const char * query_to_str(int query) { switch (query) { CASE(QUERY_CONNECT); CASE(QUERY_HELO); CASE(QUERY_MAIL); CASE(QUERY_RCPT); CASE(QUERY_DATA); CASE(QUERY_EOM); CASE(QUERY_DATALINE); default: return "QUERY_???"; } } static const char * event_to_str(int event) { switch (event) { CASE(EVENT_CONNECT); CASE(EVENT_RESET); CASE(EVENT_DISCONNECT); CASE(EVENT_TX_BEGIN); CASE(EVENT_TX_COMMIT); CASE(EVENT_TX_ROLLBACK); default: return "EVENT_???"; } } static const char * status_to_str(int status) { switch (status) { CASE(FILTER_OK); CASE(FILTER_FAIL); CASE(FILTER_CLOSE); default: return "FILTER_???"; } }