summaryrefslogtreecommitdiff
path: root/usr.bin/rsync/io.c
diff options
context:
space:
mode:
authorSebastian Benoit <benno@cvs.openbsd.org>2019-02-10 23:18:29 +0000
committerSebastian Benoit <benno@cvs.openbsd.org>2019-02-10 23:18:29 +0000
commit06b5b11b56d12bcf3bf6c163fb6dd671c3a11e8f (patch)
tree2ddb93019588847e0bf5f390280fd39d954e8b7c /usr.bin/rsync/io.c
parent673f5fd9aadd2dbb6e28fd4fb28f8550d6a702f7 (diff)
Import Kristaps' openrsync into the tree.
OK deraadt@
Diffstat (limited to 'usr.bin/rsync/io.c')
-rw-r--r--usr.bin/rsync/io.c585
1 files changed, 585 insertions, 0 deletions
diff --git a/usr.bin/rsync/io.c b/usr.bin/rsync/io.c
new file mode 100644
index 00000000000..630701dbb13
--- /dev/null
+++ b/usr.bin/rsync/io.c
@@ -0,0 +1,585 @@
+/* $Id: io.c,v 1.1 2019/02/10 23:18:28 benno Exp $ */
+/*
+ * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+#include <sys/stat.h>
+
+#include <assert.h>
+#include <endian.h>
+#include <errno.h>
+#include <poll.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+#include "extern.h"
+
+int
+io_read_check(struct sess *sess, int fd)
+{
+ struct pollfd pfd;
+
+ pfd.fd = fd;
+ pfd.events = POLLIN;
+
+ if (poll(&pfd, 1, 0) < 0) {
+ ERR(sess, "poll");
+ return -1;
+ }
+ return pfd.revents & POLLIN;
+}
+
+/*
+ * Write buffer to non-blocking descriptor.
+ * Returns zero on failure, non-zero on success (zero or more bytes).
+ */
+static int
+io_write_nonblocking(struct sess *sess,
+ int fd, const void *buf, size_t bsz, size_t *sz)
+{
+ struct pollfd pfd;
+ ssize_t wsz;
+
+ *sz = 0;
+
+ if (0 == bsz)
+ return 1;
+
+ pfd.fd = fd;
+ pfd.events = POLLOUT;
+
+ if (poll(&pfd, 1, INFTIM) < 0) {
+ ERR(sess, "poll");
+ return 0;
+ }
+ if ((pfd.revents & (POLLERR|POLLNVAL))) {
+ ERRX(sess, "poll: bad fd");
+ return 0;
+ } else if ((pfd.revents & POLLHUP)) {
+ ERRX(sess, "poll: hangup");
+ return 0;
+ } else if ( ! (pfd.revents & POLLOUT)) {
+ ERRX(sess, "poll: unknown event");
+ return 0;
+ }
+
+ if ((wsz = write(fd, buf, bsz)) < 0) {
+ ERR(sess, "write");
+ return 0;
+ }
+
+ *sz = wsz;
+ return 1;
+}
+
+/*
+ * Blocking write of the full size of the buffer.
+ * Returns 0 on failure, non-zero on success (all bytes written).
+ */
+static int
+io_write_blocking(struct sess *sess,
+ int fd, const void *buf, size_t sz)
+{
+ size_t wsz;
+ int c;
+
+ while (sz > 0) {
+ c = io_write_nonblocking(sess, fd, buf, sz, &wsz);
+ if ( ! c) {
+ ERRX1(sess, "io_write_nonblocking");
+ return 0;
+ } else if (0 == wsz) {
+ ERRX(sess, "io_write_nonblocking: short write");
+ return 0;
+ }
+ buf += wsz;
+ sz -= wsz;
+ }
+
+ return 1;
+}
+
+/*
+ * Write "buf" of size "sz" to non-blocking descriptor.
+ * Returns zero on failure, non-zero on success (all bytes written to
+ * the descriptor).
+ */
+int
+io_write_buf(struct sess *sess, int fd, const void *buf, size_t sz)
+{
+ int32_t tag, tagbuf;
+ size_t wsz;
+ int c;
+
+ if ( ! sess->mplex_writes) {
+ c = io_write_blocking(sess, fd, buf, sz);
+ sess->total_write += sz;
+ return c;
+ }
+
+ while (sz > 0) {
+ wsz = sz & 0xFFFFFF;
+ tag = (7 << 24) + wsz;
+ tagbuf = htole32(tag);
+ if ( ! io_write_blocking(sess, fd, &tagbuf, sizeof(tagbuf))) {
+ ERRX1(sess, "io_write_blocking");
+ return 0;
+ }
+ if ( ! io_write_blocking(sess, fd, buf, wsz)) {
+ ERRX1(sess, "io_write_blocking");
+ return 0;
+ }
+ sess->total_write += wsz;
+ sz -= wsz;
+ buf += wsz;
+ }
+
+ return 1;
+}
+
+/*
+ * Write "line" (NUL-terminated) followed by a newline.
+ * Returns zero on failure, non-zero on succcess.
+ */
+int
+io_write_line(struct sess *sess, int fd, const char *line)
+{
+
+ if ( ! io_write_buf(sess, fd, line, strlen(line)))
+ ERRX1(sess, "io_write_buf");
+ else if ( ! io_write_byte(sess, fd, '\n'))
+ ERRX1(sess, "io_write_byte");
+ else
+ return 1;
+
+ return 0;
+}
+
+/*
+ * Read buffer from non-blocking descriptor.
+ * Returns zero on failure, non-zero on success (zero or more bytes).
+ */
+static int
+io_read_nonblocking(struct sess *sess,
+ int fd, void *buf, size_t bsz, size_t *sz)
+{
+ struct pollfd pfd;
+ ssize_t rsz;
+
+ *sz = 0;
+
+ if (0 == bsz)
+ return 1;
+
+ pfd.fd = fd;
+ pfd.events = POLLIN;
+
+ if (poll(&pfd, 1, INFTIM) < 0) {
+ ERR(sess, "poll");
+ return 0;
+ }
+ if ((pfd.revents & (POLLERR|POLLNVAL))) {
+ ERRX(sess, "poll: bad fd");
+ return 0;
+ } else if ( ! (pfd.revents & (POLLIN|POLLHUP))) {
+ ERRX(sess, "poll: unknown event");
+ return 0;
+ }
+
+ if ((rsz = read(fd, buf, bsz)) < 0) {
+ ERR(sess, "read");
+ return 0;
+ } else if (0 == rsz) {
+ ERRX(sess, "unexpected end of file");
+ return 0;
+ }
+
+ *sz = rsz;
+ return 1;
+}
+
+/*
+ * Blocking read of the full size of the buffer.
+ * This can be called from either the error type message or a regular
+ * message---or for that matter, multiplexed or not.
+ * Returns 0 on failure, non-zero on success (all bytes read).
+ */
+static int
+io_read_blocking(struct sess *sess,
+ int fd, void *buf, size_t sz)
+{
+ size_t rsz;
+ int c;
+
+ while (sz > 0) {
+ c = io_read_nonblocking(sess, fd, buf, sz, &rsz);
+ if ( ! c) {
+ ERRX1(sess, "io_read_nonblocking");
+ return 0;
+ } else if (0 == rsz) {
+ ERRX(sess, "io_read_nonblocking: short read");
+ return 0;
+ }
+ buf += rsz;
+ sz -= rsz;
+ }
+
+ return 1;
+}
+
+/*
+ * When we do a lot of writes in a row (such as when the sender emits
+ * the file list), the server might be sending us multiplexed log
+ * messages.
+ * If it sends too many, it clogs the socket.
+ * This function looks into the read buffer and clears out any log
+ * messages pending.
+ * If called when there are valid data reads available, this function
+ * does nothing.
+ * Returns zero on failure, non-zero on success.
+ */
+int
+io_read_flush(struct sess *sess, int fd)
+{
+ int32_t tagbuf, tag;
+ char mpbuf[1024];
+
+ if (sess->mplex_read_remain)
+ return 1;
+
+ /*
+ * First, read the 4-byte multiplex tag.
+ * The first byte is the tag identifier (7 for normal
+ * data, !7 for out-of-band data), the last three are
+ * for the remaining data size.
+ */
+
+ if ( ! io_read_blocking(sess, fd, &tagbuf, sizeof(tagbuf))) {
+ ERRX1(sess, "io_read_blocking");
+ return 0;
+ }
+ tag = le32toh(tagbuf);
+ sess->mplex_read_remain = tag & 0xFFFFFF;
+ tag >>= 24;
+ if (7 == tag)
+ return 1;
+
+ tag -= 7;
+
+ if (sess->mplex_read_remain > sizeof(mpbuf)) {
+ ERRX(sess, "multiplex buffer overflow");
+ return 0;
+ } else if (0 == sess->mplex_read_remain)
+ return 1;
+
+ if ( ! io_read_blocking(sess, fd,
+ mpbuf, sess->mplex_read_remain)) {
+ ERRX1(sess, "io_read_blocking");
+ return 0;
+ }
+ if ('\n' == mpbuf[sess->mplex_read_remain - 1])
+ mpbuf[--sess->mplex_read_remain] = '\0';
+
+ /*
+ * Always print the server's messages, as the server
+ * will control its own log levelling.
+ */
+
+ LOG0(sess, "%.*s", (int)sess->mplex_read_remain, mpbuf);
+ sess->mplex_read_remain = 0;
+
+ /*
+ * I only know that a tag of one means an error.
+ * This means that we should exit.
+ */
+
+ if (1 == tag) {
+ ERRX1(sess, "error from remote host");
+ return 0;
+ }
+ return 1;
+}
+
+/*
+ * Read buffer from non-blocking descriptor, possibly in multiplex read
+ * mode.
+ * Returns zero on failure, non-zero on success (all bytes read from
+ * the descriptor).
+ */
+int
+io_read_buf(struct sess *sess, int fd, void *buf, size_t sz)
+{
+ size_t rsz;
+ int c;
+
+ /* If we're not multiplexing, read directly. */
+
+ if ( ! sess->mplex_reads) {
+ assert(0 == sess->mplex_read_remain);
+ c = io_read_blocking(sess, fd, buf, sz);
+ sess->total_read += sz;
+ return c;
+ }
+
+ while (sz > 0) {
+ /*
+ * First, check to see if we have any regular data
+ * hanging around waiting to be read.
+ * If so, read the lesser of that data and whatever
+ * amount we currently want.
+ */
+
+ if (sess->mplex_read_remain) {
+ rsz = sess->mplex_read_remain < sz ?
+ sess->mplex_read_remain : sz;
+ if ( ! io_read_blocking(sess, fd, buf, rsz)) {
+ ERRX1(sess, "io_read_blocking");
+ return 0;
+ }
+ sz -= rsz;
+ sess->mplex_read_remain -= rsz;
+ buf += rsz;
+ sess->total_read += rsz;
+ continue;
+ }
+
+ assert(0 == sess->mplex_read_remain);
+ if ( ! io_read_flush(sess, fd)) {
+ ERRX1(sess, "io_read_flush");
+ return 0;
+ }
+ }
+
+ return 1;
+}
+
+int
+io_write_long(struct sess *sess, int fd, int64_t val)
+{
+ int64_t nv;
+
+ /* Short-circuit: send as an integer if possible. */
+
+ if (val <= INT32_MAX && val >= 0)
+ return io_write_int(sess, fd, (int32_t)val);
+
+ /* Otherwise, pad with max integer, then send 64-bit. */
+
+ nv = htole64(val);
+
+ if ( ! io_write_int(sess, fd, INT32_MAX))
+ ERRX(sess, "io_write_int");
+ else if ( ! io_write_buf(sess, fd, &nv, sizeof(int64_t)))
+ ERRX(sess, "io_write_buf");
+ else
+ return 1;
+
+ return 0;
+}
+
+int
+io_write_int(struct sess *sess, int fd, int32_t val)
+{
+ int32_t nv;
+
+ nv = htole32(val);
+
+ if ( ! io_write_buf(sess, fd, &nv, sizeof(int32_t))) {
+ ERRX(sess, "io_write_buf");
+ return 0;
+ }
+ return 1;
+}
+
+/*
+ * A simple assertion-protected memory copy from th einput "val" or size
+ * "valsz" into our buffer "buf", full size "buflen", position "bufpos".
+ * Increases our "bufpos" appropriately.
+ * This has no return value, but will assert() if the size of the buffer
+ * is insufficient for the new data.
+ */
+void
+io_buffer_buf(struct sess *sess, void *buf,
+ size_t *bufpos, size_t buflen, const void *val, size_t valsz)
+{
+
+ assert(*bufpos + valsz <= buflen);
+ memcpy(buf + *bufpos, val, valsz);
+ *bufpos += valsz;
+}
+
+/*
+ * Converts "val" to LE prior to io_buffer_buf().
+ */
+void
+io_buffer_int(struct sess *sess, void *buf,
+ size_t *bufpos, size_t buflen, int32_t val)
+{
+ int32_t nv = htole32(val);
+
+ io_buffer_buf(sess, buf, bufpos,
+ buflen, &nv, sizeof(int32_t));
+}
+
+int
+io_read_ulong(struct sess *sess, int fd, uint64_t *val)
+{
+ int64_t oval;
+
+ if ( ! io_read_long(sess, fd, &oval)) {
+ ERRX(sess, "io_read_int");
+ return 0;
+ } else if (oval < 0) {
+ ERRX(sess, "io_read_size: negative value");
+ return 1;
+ }
+
+ *val = oval;
+ return 1;
+}
+
+int
+io_read_long(struct sess *sess, int fd, int64_t *val)
+{
+ int64_t oval;
+ int32_t sval;
+
+ /* Start with the short-circuit: read as an int. */
+
+ if ( ! io_read_int(sess, fd, &sval)) {
+ ERRX(sess, "io_read_int");
+ return 0;
+ } else if (INT32_MAX != sval) {
+ *val = sval;
+ return 1;
+ }
+
+ /* If the int is maximal, read as 64 bits. */
+
+ if ( ! io_read_buf(sess, fd, &oval, sizeof(int64_t))) {
+ ERRX(sess, "io_read_buf");
+ return 0;
+ }
+
+ *val = le64toh(oval);
+ return 1;
+}
+
+/*
+ * One thing we often need to do is read a size_t.
+ * These are transmitted as int32_t, so make sure that the value
+ * transmitted is not out of range.
+ * FIXME: I assume that size_t can handle int32_t's max.
+ */
+int
+io_read_size(struct sess *sess, int fd, size_t *val)
+{
+ int32_t oval;
+
+ if ( ! io_read_int(sess, fd, &oval)) {
+ ERRX(sess, "io_read_int");
+ return 0;
+ } else if (oval < 0) {
+ ERRX(sess, "io_read_size: negative value");
+ return 0;
+ }
+
+ *val = oval;
+ return 1;
+}
+
+int
+io_read_int(struct sess *sess, int fd, int32_t *val)
+{
+ int32_t oval;
+
+ if ( ! io_read_buf(sess, fd, &oval, sizeof(int32_t))) {
+ ERRX(sess, "io_read_buf");
+ return 0;
+ }
+
+ *val = le32toh(oval);
+ return 1;
+}
+
+/*
+ * Copies "valsz" from "buf", full size "bufsz" at position" bufpos",
+ * into "val".
+ * Calls assert() if the source doesn't have enough data.
+ * Increases "bufpos" to the new position.
+ */
+void
+io_unbuffer_buf(struct sess *sess, const void *buf,
+ size_t *bufpos, size_t bufsz, void *val, size_t valsz)
+{
+
+ assert(*bufpos + valsz <= bufsz);
+ memcpy(val, buf + *bufpos, valsz);
+ *bufpos += valsz;
+}
+
+/*
+ * Calls io_unbuffer_buf() and converts from LE.
+ */
+void
+io_unbuffer_int(struct sess *sess, const void *buf,
+ size_t *bufpos, size_t bufsz, int32_t *val)
+{
+ int32_t oval;
+
+ io_unbuffer_buf(sess, buf, bufpos,
+ bufsz, &oval, sizeof(int32_t));
+ *val = le32toh(oval);
+}
+
+int
+io_unbuffer_size(struct sess *sess, const void *buf,
+ size_t *bufpos, size_t bufsz, size_t *val)
+{
+ int32_t oval;
+
+ io_unbuffer_int(sess, buf, bufpos, bufsz, &oval);
+ if (oval < 0) {
+ ERRX(sess, "io_unbuffer_size: negative value");
+ return 0;
+ }
+ *val = oval;
+ return 1;
+}
+
+int
+io_read_byte(struct sess *sess, int fd, uint8_t *val)
+{
+
+ if ( ! io_read_buf(sess, fd, val, sizeof(uint8_t))) {
+ ERRX(sess, "io_read_buf");
+ return 0;
+ }
+ return 1;
+}
+
+int
+io_write_byte(struct sess *sess, int fd, uint8_t val)
+{
+
+ if ( ! io_write_buf(sess, fd, &val, sizeof(uint8_t))) {
+ ERRX(sess, "io_write_buf");
+ return 0;
+ }
+ return 1;
+}
+