/* $OpenBSD: queue_backend.c,v 1.63 2018/05/14 15:23:05 gilles Exp $ */ /* * Copyright (c) 2011 Gilles Chehade * * Permission to use, copy, modify, and distribute this software for any * purpose with or without fee is hereby granted, provided that the above * copyright notice and this permission notice appear in all copies. * * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "smtpd.h" #include "log.h" static const char* envelope_validate(struct envelope *); extern struct queue_backend queue_backend_fs; extern struct queue_backend queue_backend_null; extern struct queue_backend queue_backend_proc; extern struct queue_backend queue_backend_ram; static void queue_envelope_cache_add(struct envelope *); static void queue_envelope_cache_update(struct envelope *); static void queue_envelope_cache_del(uint64_t evpid); TAILQ_HEAD(evplst, envelope); static struct tree evpcache_tree; static struct evplst evpcache_list; static struct queue_backend *backend; static int (*handler_close)(void); static int (*handler_message_create)(uint32_t *); static int (*handler_message_commit)(uint32_t, const char*); static int (*handler_message_delete)(uint32_t); static int (*handler_message_fd_r)(uint32_t); static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *); static int (*handler_envelope_delete)(uint64_t); static int (*handler_envelope_update)(uint64_t, const char *, size_t); static int (*handler_envelope_load)(uint64_t, char *, size_t); static int (*handler_envelope_walk)(uint64_t *, char *, size_t); static int (*handler_message_walk)(uint64_t *, char *, size_t, uint32_t, int *, void **); #ifdef QUEUE_PROFILING static struct { struct timespec t0; const char *name; } profile; static inline void profile_enter(const char *name) { if ((profiling & PROFILE_QUEUE) == 0) return; profile.name = name; clock_gettime(CLOCK_MONOTONIC, &profile.t0); } static inline void profile_leave(void) { struct timespec t1, dt; if ((profiling & PROFILE_QUEUE) == 0) return; clock_gettime(CLOCK_MONOTONIC, &t1); timespecsub(&t1, &profile.t0, &dt); log_debug("profile-queue: %s %lld.%09ld", profile.name, (long long)dt.tv_sec, dt.tv_nsec); } #else #define profile_enter(x) do {} while (0) #define profile_leave() do {} while (0) #endif static int queue_message_path(uint32_t msgid, char *buf, size_t len) { return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid); } int queue_init(const char *name, int server) { struct passwd *pwq; struct group *gr; int r; pwq = getpwnam(SMTPD_QUEUE_USER); if (pwq == NULL) errx(1, "unknown user %s", SMTPD_QUEUE_USER); gr = getgrnam(SMTPD_QUEUE_GROUP); if (gr == NULL) errx(1, "unknown group %s", SMTPD_QUEUE_GROUP); tree_init(&evpcache_tree); TAILQ_INIT(&evpcache_list); if (!strcmp(name, "fs")) backend = &queue_backend_fs; else if (!strcmp(name, "null")) backend = &queue_backend_null; else if (!strcmp(name, "ram")) backend = &queue_backend_ram; else backend = &queue_backend_proc; if (server) { if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0) errx(1, "error in spool directory setup"); if (ckdir(PATH_SPOOL PATH_OFFLINE, 0770, 0, gr->gr_gid, 1) == 0) errx(1, "error in offline directory setup"); if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0) errx(1, "error in purge directory setup"); mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE); if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0) errx(1, "error in purge directory setup"); } r = backend->init(pwq, server, name); log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r); return (r); } int queue_close(void) { if (handler_close) return (handler_close()); return (1); } int queue_message_create(uint32_t *msgid) { int r; profile_enter("queue_message_create"); r = handler_message_create(msgid); profile_leave(); log_trace(TRACE_QUEUE, "queue-backend: queue_message_create() -> %d (%08"PRIx32")", r, *msgid); return (r); } int queue_message_delete(uint32_t msgid) { char msgpath[PATH_MAX]; uint64_t evpid; void *iter; int r; profile_enter("queue_message_delete"); r = handler_message_delete(msgid); profile_leave(); /* in case the message is incoming */ queue_message_path(msgid, msgpath, sizeof(msgpath)); unlink(msgpath); /* remove remaining envelopes from the cache if any (on rollback) */ evpid = msgid_to_evpid(msgid); for (;;) { iter = NULL; if (!tree_iterfrom(&evpcache_tree, &iter, evpid, &evpid, NULL)) break; if (evpid_to_msgid(evpid) != msgid) break; queue_envelope_cache_del(evpid); } log_trace(TRACE_QUEUE, "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r); return (r); } int queue_message_commit(uint32_t msgid) { int r; char msgpath[PATH_MAX]; char tmppath[PATH_MAX]; FILE *ifp = NULL; FILE *ofp = NULL; profile_enter("queue_message_commit"); queue_message_path(msgid, msgpath, sizeof(msgpath)); if (env->sc_queue_flags & QUEUE_COMPRESSION) { bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath); ifp = fopen(msgpath, "r"); ofp = fopen(tmppath, "w+"); if (ifp == NULL || ofp == NULL) goto err; if (!compress_file(ifp, ofp)) goto err; fclose(ifp); fclose(ofp); ifp = NULL; ofp = NULL; if (rename(tmppath, msgpath) == -1) { if (errno == ENOSPC) return (0); unlink(tmppath); log_warn("rename"); return (0); } } if (env->sc_queue_flags & QUEUE_ENCRYPTION) { bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath); ifp = fopen(msgpath, "r"); ofp = fopen(tmppath, "w+"); if (ifp == NULL || ofp == NULL) goto err; if (!crypto_encrypt_file(ifp, ofp)) goto err; fclose(ifp); fclose(ofp); ifp = NULL; ofp = NULL; if (rename(tmppath, msgpath) == -1) { if (errno == ENOSPC) return (0); unlink(tmppath); log_warn("rename"); return (0); } } r = handler_message_commit(msgid, msgpath); profile_leave(); /* in case it's not done by the backend */ unlink(msgpath); log_trace(TRACE_QUEUE, "queue-backend: queue_message_commit(%08"PRIx32") -> %d", msgid, r); return (r); err: if (ifp) fclose(ifp); if (ofp) fclose(ofp); return 0; } int queue_message_fd_r(uint32_t msgid) { int fdin = -1, fdout = -1, fd = -1; FILE *ifp = NULL; FILE *ofp = NULL; profile_enter("queue_message_fd_r"); fdin = handler_message_fd_r(msgid); profile_leave(); log_trace(TRACE_QUEUE, "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin); if (fdin == -1) return (-1); if (env->sc_queue_flags & QUEUE_ENCRYPTION) { if ((fdout = mktmpfile()) == -1) goto err; if ((fd = dup(fdout)) == -1) goto err; if ((ifp = fdopen(fdin, "r")) == NULL) goto err; fdin = fd; fd = -1; if ((ofp = fdopen(fdout, "w+")) == NULL) goto err; if (!crypto_decrypt_file(ifp, ofp)) goto err; fclose(ifp); ifp = NULL; fclose(ofp); ofp = NULL; lseek(fdin, SEEK_SET, 0); } if (env->sc_queue_flags & QUEUE_COMPRESSION) { if ((fdout = mktmpfile()) == -1) goto err; if ((fd = dup(fdout)) == -1) goto err; if ((ifp = fdopen(fdin, "r")) == NULL) goto err; fdin = fd; fd = -1; if ((ofp = fdopen(fdout, "w+")) == NULL) goto err; if (!uncompress_file(ifp, ofp)) goto err; fclose(ifp); ifp = NULL; fclose(ofp); ofp = NULL; lseek(fdin, SEEK_SET, 0); } return (fdin); err: if (fd != -1) close(fd); if (fdin != -1) close(fdin); if (fdout != -1) close(fdout); if (ifp) fclose(ifp); if (ofp) fclose(ofp); return -1; } int queue_message_fd_rw(uint32_t msgid) { char buf[PATH_MAX]; queue_message_path(msgid, buf, sizeof(buf)); return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600); } static int queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) { char *evp; size_t evplen; size_t complen; char compbuf[sizeof(struct envelope)]; size_t enclen; char encbuf[sizeof(struct envelope)]; evp = evpbuf; evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize); if (evplen == 0) return (0); if (env->sc_queue_flags & QUEUE_COMPRESSION) { complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf); if (complen == 0) return (0); evp = compbuf; evplen = complen; } if (env->sc_queue_flags & QUEUE_ENCRYPTION) { enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf); if (enclen == 0) return (0); evp = encbuf; evplen = enclen; } memmove(evpbuf, evp, evplen); return (evplen); } static int queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) { char *evp; size_t evplen; char compbuf[sizeof(struct envelope)]; size_t complen; char encbuf[sizeof(struct envelope)]; size_t enclen; evp = evpbuf; evplen = evpbufsize; if (env->sc_queue_flags & QUEUE_ENCRYPTION) { enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf); if (enclen == 0) return (0); evp = encbuf; evplen = enclen; } if (env->sc_queue_flags & QUEUE_COMPRESSION) { complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf); if (complen == 0) return (0); evp = compbuf; evplen = complen; } return (envelope_load_buffer(ep, evp, evplen)); } static void queue_envelope_cache_add(struct envelope *e) { struct envelope *cached; while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size) queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id); cached = xcalloc(1, sizeof *cached, "queue_envelope_cache_add"); *cached = *e; TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); tree_xset(&evpcache_tree, e->id, cached); stat_increment("queue.evpcache.size", 1); } static void queue_envelope_cache_update(struct envelope *e) { struct envelope *cached; if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) { queue_envelope_cache_add(e); stat_increment("queue.evpcache.update.missed", 1); } else { TAILQ_REMOVE(&evpcache_list, cached, entry); *cached = *e; TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); stat_increment("queue.evpcache.update.hit", 1); } } static void queue_envelope_cache_del(uint64_t evpid) { struct envelope *cached; if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL) return; TAILQ_REMOVE(&evpcache_list, cached, entry); free(cached); stat_decrement("queue.evpcache.size", 1); } int queue_envelope_create(struct envelope *ep) { int r; char evpbuf[sizeof(struct envelope)]; size_t evplen; uint64_t evpid; uint32_t msgid; ep->creation = time(NULL); evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); if (evplen == 0) return (0); evpid = ep->id; msgid = evpid_to_msgid(evpid); profile_enter("queue_envelope_create"); r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id); profile_leave(); log_trace(TRACE_QUEUE, "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")", evpid, evplen, r, ep->id); if (!r) { ep->creation = 0; ep->id = 0; } if (r && env->sc_queue_flags & QUEUE_EVPCACHE) queue_envelope_cache_add(ep); return (r); } int queue_envelope_delete(uint64_t evpid) { int r; if (env->sc_queue_flags & QUEUE_EVPCACHE) queue_envelope_cache_del(evpid); profile_enter("queue_envelope_delete"); r = handler_envelope_delete(evpid); profile_leave(); log_trace(TRACE_QUEUE, "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d", evpid, r); return (r); } int queue_envelope_load(uint64_t evpid, struct envelope *ep) { const char *e; char evpbuf[sizeof(struct envelope)]; size_t evplen; struct envelope *cached; if ((env->sc_queue_flags & QUEUE_EVPCACHE) && (cached = tree_get(&evpcache_tree, evpid))) { *ep = *cached; stat_increment("queue.evpcache.load.hit", 1); return (1); } ep->id = evpid; profile_enter("queue_envelope_load"); evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf); profile_leave(); log_trace(TRACE_QUEUE, "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu", evpid, evplen); if (evplen == 0) return (0); if (queue_envelope_load_buffer(ep, evpbuf, evplen)) { if ((e = envelope_validate(ep)) == NULL) { ep->id = evpid; if (env->sc_queue_flags & QUEUE_EVPCACHE) { queue_envelope_cache_add(ep); stat_increment("queue.evpcache.load.missed", 1); } return (1); } log_warnx("warn: invalid envelope %016" PRIx64 ": %s", evpid, e); } return (0); } int queue_envelope_update(struct envelope *ep) { char evpbuf[sizeof(struct envelope)]; size_t evplen; int r; evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); if (evplen == 0) return (0); profile_enter("queue_envelope_update"); r = handler_envelope_update(ep->id, evpbuf, evplen); profile_leave(); if (r && env->sc_queue_flags & QUEUE_EVPCACHE) queue_envelope_cache_update(ep); log_trace(TRACE_QUEUE, "queue-backend: queue_envelope_update(%016"PRIx64") -> %d", ep->id, r); return (r); } int queue_message_walk(struct envelope *ep, uint32_t msgid, int *done, void **data) { char evpbuf[sizeof(struct envelope)]; uint64_t evpid; int r; const char *e; profile_enter("queue_message_walk"); r = handler_message_walk(&evpid, evpbuf, sizeof evpbuf, msgid, done, data); profile_leave(); log_trace(TRACE_QUEUE, "queue-backend: queue_message_walk() -> %d (%016"PRIx64")", r, evpid); if (r == -1) return (r); if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) { if ((e = envelope_validate(ep)) == NULL) { ep->id = evpid; /* * do not cache the envelope here, while discovering * envelopes one could re-run discover on already * scheduled envelopes which leads to triggering of * strict checks in caching. Envelopes could anyway * be loaded from backend if it isn't cached. */ return (1); } log_warnx("warn: invalid envelope %016" PRIx64 ": %s", evpid, e); } return (0); } int queue_envelope_walk(struct envelope *ep) { const char *e; uint64_t evpid; char evpbuf[sizeof(struct envelope)]; int r; profile_enter("queue_envelope_walk"); r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf); profile_leave(); log_trace(TRACE_QUEUE, "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")", r, evpid); if (r == -1) return (r); if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) { if ((e = envelope_validate(ep)) == NULL) { ep->id = evpid; if (env->sc_queue_flags & QUEUE_EVPCACHE) queue_envelope_cache_add(ep); return (1); } log_warnx("warn: invalid envelope %016" PRIx64 ": %s", evpid, e); } return (0); } uint32_t queue_generate_msgid(void) { uint32_t msgid; while ((msgid = arc4random()) == 0) ; return msgid; } uint64_t queue_generate_evpid(uint32_t msgid) { uint32_t rnd; uint64_t evpid; while ((rnd = arc4random()) == 0) ; evpid = msgid; evpid <<= 32; evpid |= rnd; return evpid; } static const char* envelope_validate(struct envelope *ep) { if (ep->version != SMTPD_ENVELOPE_VERSION) return "version mismatch"; if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL) return "invalid helo"; if (ep->helo[0] == '\0') return "empty helo"; if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL) return "invalid hostname"; if (ep->hostname[0] == '\0') return "empty hostname"; if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL) return "invalid error line"; return NULL; } void queue_api_on_close(int(*cb)(void)) { handler_close = cb; } void queue_api_on_message_create(int(*cb)(uint32_t *)) { handler_message_create = cb; } void queue_api_on_message_commit(int(*cb)(uint32_t, const char *)) { handler_message_commit = cb; } void queue_api_on_message_delete(int(*cb)(uint32_t)) { handler_message_delete = cb; } void queue_api_on_message_fd_r(int(*cb)(uint32_t)) { handler_message_fd_r = cb; } void queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *)) { handler_envelope_create = cb; } void queue_api_on_envelope_delete(int(*cb)(uint64_t)) { handler_envelope_delete = cb; } void queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t)) { handler_envelope_update = cb; } void queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t)) { handler_envelope_load = cb; } void queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t)) { handler_envelope_walk = cb; } void queue_api_on_message_walk(int(*cb)(uint64_t *, char *, size_t, uint32_t, int *, void **)) { handler_message_walk = cb; }