summaryrefslogtreecommitdiff
path: root/usr.bin/rsync/sender.c
diff options
context:
space:
mode:
authorSebastian Benoit <benno@cvs.openbsd.org>2019-02-18 21:34:55 +0000
committerSebastian Benoit <benno@cvs.openbsd.org>2019-02-18 21:34:55 +0000
commitbde034a382776f3c396eef446206fe579292673e (patch)
treecc19160b5f11f9dcb25f390e1b1f89f36bb5b3f2 /usr.bin/rsync/sender.c
parent574102a6ff9a74c28f2f7825a5f76cbd68781424 (diff)
sync with kristaps up to Sun Feb 17 2019
339cf5998c0c022623cd68de50722b6c14543952 Push "error trail" further into code. baf58ce5fe1bc6ce431b3b0ac8264b83ae8c7d02 Document all arguments. Add common -av usage. Remove bits about not supporting anything but files/dirs. 821a811a8c80e52fb56b241fc65a16cae1b4fb2c Disambiguate as prodded by deraadt@ 6c4475b8f226e9031ec0ec1b3f14f7d347132c87 Add -h to usage string 4d344ae6156873b44c95de0c1ed629e637c2d7ab Clarify error message language, use service name instead of port, specify that the socket is SOCK_STREAM. From deraadt@. Tweaked for lowercase messages. f3ec049e76257fc96bcdc872f1d3b967b98f3eb6 In consideration to benno@'s comments, let the mktemp functions propogate an errno handled by the caller. Also keep the original line lengths. While in mktemp.c, make some defines into an enum. e116c2bd00e634b56e4276120135915ceaa31cf2 Put the FSM of the sender into its own function. Put dry_run ack and end of phase ack into the send buffer too, further reducing the possibility of deadlock. c7745aa4c7394ca89d841f8ee76782256d694340 Make the sender write loop be fully non-blocking. This frees us of deadlocking the protocol because the sender will always be able to pull down data. 93c7b4843e80aeac2ec6ae6ffc395df4deaf4a31 Remove "yoda" notation to be more in tune with OpenBSD. Most found by deraadt@.
Diffstat (limited to 'usr.bin/rsync/sender.c')
-rw-r--r--usr.bin/rsync/sender.c376
1 files changed, 203 insertions, 173 deletions
diff --git a/usr.bin/rsync/sender.c b/usr.bin/rsync/sender.c
index d42d3a1a342..bb2c148538e 100644
--- a/usr.bin/rsync/sender.c
+++ b/usr.bin/rsync/sender.c
@@ -1,4 +1,4 @@
-/* $Id: sender.c,v 1.13 2019/02/17 16:34:04 deraadt Exp $ */
+/* $Id: sender.c,v 1.14 2019/02/18 21:34:54 benno Exp $ */
/*
* Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
*
@@ -34,7 +34,7 @@
* A request from the receiver to download updated file data.
*/
struct send_dl {
- int32_t idx; /* index in our file list */
+ int32_t idx; /* index in our file list */
struct blkset *blks; /* the sender's block information */
TAILQ_ENTRY(send_dl) entries;
};
@@ -46,7 +46,6 @@ struct send_dl {
struct send_up {
struct send_dl *cur; /* file being updated or NULL */
struct blkstat stat; /* status of file being updated */
- int primed; /* blk_recv_ack() was called */
};
TAILQ_HEAD(send_dlq, send_dl);
@@ -87,7 +86,189 @@ send_up_reset(struct send_up *p)
p->stat.offs = 0;
p->stat.hint = 0;
p->stat.curst = BLKSTAT_NONE;
- p->primed = 0;
+}
+
+/*
+ * This is the bulk of the sender work.
+ * Here we tend to an output buffer that responds to receiver requests
+ * for data.
+ * This does not act upon the output descriptor itself so as to avoid
+ * blocking, which otherwise would deadlock the protocol.
+ * Returns zero on failure, non-zero on success.
+ */
+static int
+send_up_fsm(struct sess *sess, size_t *phase,
+ struct send_up *up, void **wb, size_t *wbsz, size_t *wbmax,
+ const struct flist *fl)
+{
+ size_t pos = 0, isz = sizeof(int32_t),
+ dsz = MD4_DIGEST_LENGTH;
+ unsigned char fmd[MD4_DIGEST_LENGTH];
+ off_t sz;
+ char buf[20];
+
+ switch (up->stat.curst) {
+ case BLKSTAT_DATA:
+ /*
+ * A data segment to be written: buffer both the length
+ * and the data.
+ * If we've finished the transfer, move on to the token;
+ * otherwise, keep sending data.
+ */
+
+ sz = MINIMUM(MAX_CHUNK,
+ up->stat.curlen - up->stat.curpos);
+ if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) {
+ ERRX1(sess, "io_lowbuffer_alloc");
+ return 0;
+ }
+ io_lowbuffer_int(sess, *wb, &pos, *wbsz, sz);
+ if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, sz)) {
+ ERRX1(sess, "io_lowbuffer_alloc");
+ return 0;
+ }
+ io_lowbuffer_buf(sess, *wb, &pos, *wbsz,
+ up->stat.map + up->stat.curpos, sz);
+
+ up->stat.curpos += sz;
+ if (up->stat.curpos == up->stat.curlen)
+ up->stat.curst = BLKSTAT_TOK;
+ return 1;
+ case BLKSTAT_TOK:
+ /*
+ * The data token following (maybe) a data segment.
+ * These can also come standalone if, say, the file's
+ * being fully written.
+ * It's followed by a hash or another data segment,
+ * depending on the token.
+ */
+
+ if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) {
+ ERRX1(sess, "io_lowbuffer_alloc");
+ return 0;
+ }
+ io_lowbuffer_int(sess, *wb,
+ &pos, *wbsz, up->stat.curtok);
+ up->stat.curst = up->stat.curtok ?
+ BLKSTAT_NEXT : BLKSTAT_HASH;
+ return 1;
+ case BLKSTAT_HASH:
+ /*
+ * The hash following transmission of all file contents.
+ * This is always followed by the state that we're
+ * finished with the file.
+ */
+
+ hash_file(up->stat.map, up->stat.mapsz, fmd, sess);
+ if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, dsz)) {
+ ERRX1(sess, "io_lowbuffer_alloc");
+ return 0;
+ }
+ io_lowbuffer_buf(sess, *wb, &pos, *wbsz, fmd, dsz);
+ up->stat.curst = BLKSTAT_DONE;
+ return 1;
+ case BLKSTAT_DONE:
+ /*
+ * The data has been written.
+ * Clear our current send file and allow the block below
+ * to find another.
+ */
+
+ LOG3(sess, "%s: flushed %jd KB total, %.2f%% uploaded",
+ fl[up->cur->idx].path,
+ (intmax_t)up->stat.total / 1024,
+ 100.0 * up->stat.dirty / up->stat.total);
+ send_up_reset(up);
+ return 1;
+ case BLKSTAT_PHASE:
+ /*
+ * This is where we actually stop the algorithm: we're
+ * already at the second phase.
+ */
+
+ send_up_reset(up);
+ (*phase)++;
+ return 1;
+ case BLKSTAT_NEXT:
+ /*
+ * Our last case: we need to find the
+ * next block (and token) to transmit to
+ * the receiver.
+ * These will drive the finite state
+ * machine in the first few conditional
+ * blocks of this set.
+ */
+
+ assert(up->stat.fd != -1);
+ blk_match(sess, up->cur->blks,
+ fl[up->cur->idx].path, &up->stat);
+ return 1;
+ case BLKSTAT_NONE:
+ break;
+ }
+
+ assert(BLKSTAT_NONE == up->stat.curst);
+
+ /*
+ * We've either hit the phase change following the last file (or
+ * start, or prior phase change), or we need to prime the next
+ * file for transmission.
+ * We special-case dry-run mode.
+ */
+
+ if (up->cur->idx < 0) {
+ if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) {
+ ERRX1(sess, "io_lowbuffer_alloc");
+ return 0;
+ }
+ io_lowbuffer_int(sess, *wb, &pos, *wbsz, -1);
+
+ if (sess->opts->server && sess->rver > 27) {
+ if (!io_lowbuffer_alloc(sess,
+ wb, wbsz, wbmax, isz)) {
+ ERRX1(sess, "io_lowbuffer_alloc");
+ return 0;
+ }
+ io_lowbuffer_int(sess, *wb, &pos, *wbsz, -1);
+ }
+ up->stat.curst = BLKSTAT_PHASE;
+ } else if (sess->opts->dry_run) {
+ if (!sess->opts->server)
+ LOG1(sess, "%s", fl[up->cur->idx].wpath);
+
+ if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) {
+ ERRX1(sess, "io_lowbuffer_alloc");
+ return 0;
+ }
+ io_lowbuffer_int(sess, *wb, &pos, *wbsz, up->cur->idx);
+ up->stat.curst = BLKSTAT_NEXT;
+ } else {
+ assert(up->stat.fd != -1);
+
+ /*
+ * FIXME: use the nice output of log_file() and so on in
+ * downloader.c, which means moving this into
+ * BLKSTAT_DONE instead of having it be here.
+ */
+
+ if (!sess->opts->server)
+ LOG1(sess, "%s", fl[up->cur->idx].wpath);
+
+ if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, 20)) {
+ ERRX1(sess, "io_lowbuffer_alloc");
+ return 0;
+ }
+ assert(sizeof(buf) == 20);
+ blk_recv_ack(sess, buf, up->cur->blks, up->cur->idx);
+ io_lowbuffer_buf(sess, *wb, &pos, *wbsz, buf, 20);
+
+ LOG3(sess, "%s: primed for %jd B total",
+ fl[up->cur->idx].path,
+ (intmax_t)up->cur->blks->size);
+ up->stat.curst = BLKSTAT_NEXT;
+ }
+
+ return 1;
}
/*
@@ -103,7 +284,7 @@ send_dl_enqueue(struct sess *sess, struct send_dlq *q,
int32_t idx, const struct flist *fl, size_t flsz, int fd)
{
struct send_dl *s;
-
+
/* End-of-phase marker. */
if (idx == -1) {
@@ -118,7 +299,7 @@ send_dl_enqueue(struct sess *sess, struct send_dlq *q,
}
/* Validate the index. */
-
+
if (idx < 0 || (uint32_t)idx >= flsz) {
ERRX(sess, "file index out of bounds: invalid %"
PRId32 " out of %zu", idx, flsz);
@@ -178,7 +359,6 @@ rsync_sender(struct sess *sess, int fdin,
struct flist *fl = NULL;
const struct flist *f;
size_t i, flsz = 0, phase = 0, excl;
- off_t sz;
int rc = 0, c;
int32_t idx;
struct pollfd pfd[3];
@@ -186,9 +366,8 @@ rsync_sender(struct sess *sess, int fdin,
struct send_dl *dl;
struct send_up up;
struct stat st;
- unsigned char filemd[MD4_DIGEST_LENGTH];
void *wbuf = NULL;
- size_t wbufpos = 0, pos, wbufsz = 0, wbufmax = 0;
+ size_t wbufpos = 0, wbufsz = 0, wbufmax = 0;
ssize_t ssz;
if (pledge("stdio getpw rpath unveil", NULL) == -1) {
@@ -215,7 +394,7 @@ rsync_sender(struct sess *sess, int fdin,
/* Client sends zero-length exclusions if deleting. */
if (!sess->opts->server && sess->opts->del &&
- !io_write_int(sess, fdout, 0)) {
+ !io_write_int(sess, fdout, 0)) {
ERRX1(sess, "io_write_int");
goto out;
}
@@ -384,170 +563,20 @@ rsync_sender(struct sess *sess, int fdin,
sess->total_write += ssz;
}
- if (pfd[1].revents & POLLOUT) {
+ /*
+ * Engage the FSM for the current transfer.
+ * If our phase changes, stop processing.
+ */
+
+ if (pfd[1].revents & POLLOUT && up.cur != NULL) {
assert(pfd[2].fd == -1);
assert(wbufpos == 0 && wbufsz == 0);
-
- /*
- * If we have data to write, do it now according
- * to the data finite state machine.
- * If we receive an invalid index (-1), then
- * we're either promoted to the second phase or
- * it's time to exit, depending upon which phase
- * we're in.
- * Otherwise, we either start a transfer
- * sequence (if not primed) or continue one.
- */
-
- pos = 0;
- if (BLKSTAT_DATA == up.stat.curst) {
- /*
- * A data segment to be written: buffer
- * both the length and the data, then
- * put is in the token phase.
- */
-
- sz = MINIMUM(MAX_CHUNK,
- up.stat.curlen - up.stat.curpos);
- if (!io_lowbuffer_alloc(sess, &wbuf,
- &wbufsz, &wbufmax, sizeof(int32_t))) {
- ERRX1(sess, "io_lowbuffer_alloc");
- goto out;
- }
- io_lowbuffer_int(sess,
- wbuf, &pos, wbufsz, sz);
- if (!io_lowbuffer_alloc(sess, &wbuf,
- &wbufsz, &wbufmax, sz)) {
- ERRX1(sess, "io_lowbuffer_alloc");
- goto out;
- }
- io_lowbuffer_buf(sess, wbuf, &pos, wbufsz,
- up.stat.map + up.stat.curpos, sz);
- up.stat.curpos += sz;
- if (up.stat.curpos == up.stat.curlen)
- up.stat.curst = BLKSTAT_TOK;
- } else if (BLKSTAT_TOK == up.stat.curst) {
- /*
- * The data token following (maybe) a
- * data segment.
- * These can also come standalone if,
- * say, the file's being fully written.
- * It's followed by a hash or another
- * data segment, depending on the token.
- */
-
- if (!io_lowbuffer_alloc(sess, &wbuf,
- &wbufsz, &wbufmax, sizeof(int32_t))) {
- ERRX1(sess, "io_lowbuffer_alloc");
- goto out;
- }
- io_lowbuffer_int(sess, wbuf,
- &pos, wbufsz, up.stat.curtok);
- up.stat.curst = up.stat.curtok ?
- BLKSTAT_NONE : BLKSTAT_HASH;
- } else if (BLKSTAT_HASH == up.stat.curst) {
- /*
- * The hash following transmission of
- * all file contents.
- * This is always followed by the state
- * that we're finished with the file.
- */
-
- hash_file(up.stat.map,
- up.stat.mapsz, filemd, sess);
- if (!io_lowbuffer_alloc(sess, &wbuf,
- &wbufsz, &wbufmax, MD4_DIGEST_LENGTH)) {
- ERRX1(sess, "io_lowbuffer_alloc");
- goto out;
- }
- io_lowbuffer_buf(sess, wbuf, &pos,
- wbufsz, filemd, MD4_DIGEST_LENGTH);
- up.stat.curst = BLKSTAT_DONE;
- } else if (BLKSTAT_DONE == up.stat.curst) {
- /*
- * The data has been written.
- * Clear our current send file and allow
- * the block below to find another.
- */
-
- LOG3(sess, "%s: flushed %jd KB total, "
- "%.2f%% uploaded",
- fl[up.cur->idx].path,
- (intmax_t)up.stat.total / 1024,
- 100.0 * up.stat.dirty / up.stat.total);
- send_up_reset(&up);
- } else if (up.cur != NULL && up.cur->idx < 0) {
- /*
- * We've hit the phase change following
- * the last file (or start, or prior
- * phase change).
- * Simply acknowledge it.
- * FIXME: use buffering.
- */
-
- if (!io_write_int(sess, fdout, -1)) {
- ERRX1(sess, "io_write_int");
- goto out;
- }
- if (sess->opts->server && sess->rver > 27 &&
- !io_write_int(sess, fdout, -1)) {
- ERRX1(sess, "io_write_int");
- goto out;
- }
- send_up_reset(&up);
-
- /*
- * This is where we actually stop the
- * algorithm: we're already at the
- * second phase.
- */
-
- if (phase++)
- break;
- } else if (up.cur != NULL && up.primed == 0) {
- /*
- * We're getting ready to send the file
- * contents to the receiver.
- * FIXME: use buffering.
- */
-
- if (!sess->opts->server)
- LOG1(sess, "%s", fl[up.cur->idx].wpath);
-
- /* Dry-running does nothing but a response. */
-
- if (sess->opts->dry_run &&
- !io_write_int(sess, fdout, up.cur->idx)) {
- ERRX1(sess, "io_write_int");
- goto out;
- }
-
- /* Actually perform the block send. */
-
- assert(up.stat.fd != -1);
- if (!blk_recv_ack(sess, fdout,
- up.cur->blks, up.cur->idx)) {
- ERRX1(sess, "blk_recv_ack");
- goto out;
- }
- LOG3(sess, "%s: primed for %jd B total",
- fl[up.cur->idx].path,
- (intmax_t)up.cur->blks->size);
- up.primed = 1;
- } else if (up.cur != NULL) {
- /*
- * Our last case: we need to find the
- * next block (and token) to transmit to
- * the receiver.
- * These will drive the finite state
- * machine in the first few conditional
- * blocks of this set.
- */
-
- assert(up.stat.fd != -1);
- blk_match(sess, up.cur->blks,
- fl[up.cur->idx].path, &up.stat);
- }
+ if (!send_up_fsm(sess, &phase,
+ &up, &wbuf, &wbufsz, &wbufmax, fl)) {
+ ERRX1(sess, "send_up_fsm");
+ goto out;
+ } else if (phase > 1)
+ break;
}
/*
@@ -586,7 +615,7 @@ rsync_sender(struct sess *sess, int fdin,
pfd[1].fd = fdout;
continue;
}
-
+
/*
* Non-blocking open of file.
* This will be picked up in the state machine
@@ -628,6 +657,7 @@ rsync_sender(struct sess *sess, int fdin,
out:
send_up_reset(&up);
while ((dl = TAILQ_FIRST(&sdlq)) != NULL) {
+ TAILQ_REMOVE(&sdlq, dl, entries);
free(dl->blks);
free(dl);
}