summaryrefslogtreecommitdiff
path: root/usr.bin
diff options
context:
space:
mode:
authorFlorian Obser <florian@cvs.openbsd.org>2019-02-16 16:58:40 +0000
committerFlorian Obser <florian@cvs.openbsd.org>2019-02-16 16:58:40 +0000
commit61713811184ecd5d663058b78ad75321ae689eaf (patch)
treef2d2bd40f77c0237311cfbe3c9a08d2c59f64c7b /usr.bin
parent9f162b46faa21e0ac99e0dde04178f5f7d0c1cbb (diff)
sync with kristaps, commit 1842d31d600f2834ae78fe0d99a29519a853d75c
Make sender mostly nonblocking for writes. This takes a lot of logic once in blocks.c and puts it directly into sender.c. It allows running openrsync against itself without deadlocks.
Diffstat (limited to 'usr.bin')
-rw-r--r--usr.bin/rsync/blocks.c206
-rw-r--r--usr.bin/rsync/extern.h48
-rw-r--r--usr.bin/rsync/io.c8
-rw-r--r--usr.bin/rsync/sender.c271
4 files changed, 312 insertions, 221 deletions
diff --git a/usr.bin/rsync/blocks.c b/usr.bin/rsync/blocks.c
index 2849945d9c0..5ba80469634 100644
--- a/usr.bin/rsync/blocks.c
+++ b/usr.bin/rsync/blocks.c
@@ -1,4 +1,4 @@
-/* $Id: blocks.c,v 1.8 2019/02/16 16:57:48 florian Exp $ */
+/* $Id: blocks.c,v 1.9 2019/02/16 16:58:39 florian Exp $ */
/*
* Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
*
@@ -30,38 +30,6 @@
#include "extern.h"
/*
- * Flush out "size" bytes of the buffer, doing all of the appropriate
- * chunking of the data, then the subsequent token (or zero).
- * Return zero on failure, non-zero on success.
- */
-static int
-blk_flush(struct sess *sess, int fd,
- const void *b, off_t size, int32_t token)
-{
- off_t i = 0, sz;
-
- while (i < size) {
- sz = MAX_CHUNK < (size - i) ?
- MAX_CHUNK : (size - i);
- if (!io_write_int(sess, fd, sz)) {
- ERRX1(sess, "io_write_int");
- return 0;
- } else if (!io_write_buf(sess, fd, b + i, sz)) {
- ERRX1(sess, "io_write_buf");
- return 0;
- }
- i += sz;
- }
-
- if (!io_write_int(sess, fd, token)) {
- ERRX1(sess, "io_write_int");
- return 0;
- }
-
- return 1;
-}
-
-/*
* From our current position of "offs" in buffer "buf" of total size
* "size", see if we can find a matching block in our list of blocks.
* The "hint" refers to the block that *might* work.
@@ -146,99 +114,18 @@ blk_find(struct sess *sess, const void *buf, off_t size, off_t offs,
}
/*
- * The main reconstruction algorithm on the sender side.
- * This is reentrant: it's meant to be called whenever "fd" unblocks for
- * writing by the sender.
- * Scans byte-wise over the input file, looking for matching blocks in
- * what the server sent us.
- * If a block is found, emit all data up until the block, then the token
- * for the block.
- * The receiving end can then reconstruct the file trivially.
- * Return zero on failure, non-zero on success.
- */
-static int
-blk_match_send(struct sess *sess, const char *path, int fd,
- const struct blkset *blks, struct blkstat *st)
-{
- off_t last, end, sz;
- int32_t tok;
- struct blk *blk;
-
- /*
- * Stop searching at the length of the file minus the size of
- * the last block.
- * The reason for this being that we don't need to do an
- * incremental hash within the last block---if it doesn't match,
- * it doesn't match.
- */
-
- end = st->mapsz + 1 - blks->blks[blks->blksz - 1].len;
- last = st->offs;
-
- for ( ; st->offs < end; st->offs++) {
- blk = blk_find(sess, st->map, st->mapsz,
- st->offs, blks, path, st->hint);
- if (blk == NULL)
- continue;
-
- sz = st->offs - last;
- st->dirty += sz;
- st->total += sz;
- LOG4(sess, "%s: flushing %jd B before %zu B "
- "block %zu", path, (intmax_t)sz, blk->len, blk->idx);
- tok = -(blk->idx + 1);
-
- /*
- * Write the data we have, then follow it with the tag
- * of the block that matches.
- * The receiver will then write our data, then the data
- * it already has in the matching block.
- */
-
- if (!blk_flush(sess, fd, st->map + last, sz, tok)) {
- ERRX1(sess, "blk_flush");
- return -1;
- }
-
- st->total += blk->len;
- st->offs += blk->len;
- st->hint = blk->idx + 1;
- return 0;
- }
-
- /* Emit remaining data and send terminator token. */
-
- sz = st->mapsz - last;
- st->total += sz;
- st->dirty += sz;
-
- LOG4(sess, "%s: flushing remaining %jd B", path, (intmax_t)sz);
-
- if (!blk_flush(sess, fd, st->map + last, sz, 0)) {
- ERRX1(sess, "blk_flush");
- return -1;
- }
-
- LOG3(sess, "%s: flushed (chunked) %jd B total, "
- "%.2f%% upload ratio", path, (intmax_t)st->total,
- 100.0 * st->dirty / st->total);
- return 1;
-}
-
-/*
* Given a local file "path" and the blocks created by a remote machine,
* find out which blocks of our file they don't have and send them.
* This function is reentrant: it must be called while there's still
* data to send.
- * Return 0 if there's more data to send, >0 if the file has completed
- * its update, or <0 on error.
*/
-int
-blk_match(struct sess *sess, int fd, const struct blkset *blks,
+void
+blk_match(struct sess *sess, const struct blkset *blks,
const char *path, struct blkstat *st)
{
- unsigned char filemd[MD4_DIGEST_LENGTH];
- int c;
+ off_t last, end, sz;
+ int32_t tok;
+ struct blk *blk;
/*
* If the file's empty or we don't have any blocks from the
@@ -248,34 +135,69 @@ blk_match(struct sess *sess, int fd, const struct blkset *blks,
*/
if (st->mapsz && blks->blksz) {
- if ((c = blk_match_send(sess, path, fd, blks, st)) < 0) {
- ERRX1(sess, "blk_match_send");
- return -1;
- } else if (c == 0)
- return 0;
- } else {
- if (!blk_flush(sess, fd, st->map, st->mapsz, 0)) {
- ERRX1(sess, "blk_flush");
- return -1;
+ /*
+ * Stop searching at the length of the file minus the
+ * size of the last block.
+ * The reason for this being that we don't need to do an
+ * incremental hash within the last block---if it
+ * doesn't match, it doesn't match.
+ */
+
+ end = st->mapsz + 1 - blks->blks[blks->blksz - 1].len;
+ last = st->offs;
+
+ for ( ; st->offs < end; st->offs++) {
+ blk = blk_find(sess, st->map, st->mapsz,
+ st->offs, blks, path, st->hint);
+ if (blk == NULL)
+ continue;
+
+ sz = st->offs - last;
+ st->dirty += sz;
+ st->total += sz;
+ LOG4(sess, "%s: flushing %jd B before %zu B "
+ "block %zu", path, (intmax_t)sz,
+ blk->len, blk->idx);
+ tok = -(blk->idx + 1);
+
+ /*
+ * Write the data we have, then follow it with
+ * the tag of the block that matches.
+ */
+
+ st->curpos = last;
+ st->curlen = st->curpos + sz;
+ st->curtok = tok;
+ assert(0 != st->curtok);
+ st->curst = sz ? BLKSTAT_DATA : BLKSTAT_TOK;
+ st->total += blk->len;
+ st->offs += blk->len;
+ st->hint = blk->idx + 1;
+ return;
}
- LOG3(sess, "%s: flushed (un-chunked) %jd B, 100%% upload ratio",
- path, (intmax_t)st->mapsz);
- }
- /*
- * Now write the full file hash.
- * Since we're seeding the hash, this always gives us some sort
- * of data even if the file's zero-length.
- */
+ /* Emit remaining data and send terminator token. */
- hash_file(st->map, st->mapsz, filemd, sess);
+ sz = st->mapsz - last;
+ LOG4(sess, "%s: flushing remaining %jd B",
+ path, (intmax_t)sz);
- if (!io_write_buf(sess, fd, filemd, MD4_DIGEST_LENGTH)) {
- ERRX1(sess, "io_write_buf");
- return -1;
+ st->total += sz;
+ st->dirty += sz;
+ st->curpos = last;
+ st->curlen = st->curpos + sz;
+ st->curtok = 0;
+ st->curst = sz ? BLKSTAT_DATA : BLKSTAT_TOK;
+ } else {
+ st->curpos = 0;
+ st->curlen = st->mapsz;
+ st->curtok = 0;
+ st->curst = st->mapsz ? BLKSTAT_DATA : BLKSTAT_TOK;
+ st->dirty = st->total = st->mapsz;
+
+ LOG4(sess, "%s: flushing whole file %zu B",
+ path, st->mapsz);
}
-
- return 1;
}
/*
diff --git a/usr.bin/rsync/extern.h b/usr.bin/rsync/extern.h
index 2fb34629fa7..c8cf4012c26 100644
--- a/usr.bin/rsync/extern.h
+++ b/usr.bin/rsync/extern.h
@@ -1,4 +1,4 @@
-/* $Id: extern.h,v 1.13 2019/02/16 16:57:17 florian Exp $ */
+/* $Id: extern.h,v 1.14 2019/02/16 16:58:39 florian Exp $ */
/*
* Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
*
@@ -41,7 +41,11 @@
#define CSUM_LENGTH_PHASE1 (2)
#define CSUM_LENGTH_PHASE2 (16)
-#define POLL_TIMEOUT (10000)
+/*
+ * Use this for debugging deadlocks.
+ * All poll events will use it and catch time-outs.
+ */
+#define POLL_TIMEOUT (INFTIM)
/*
* Operating mode for a client or a server.
@@ -128,17 +132,30 @@ struct blk {
unsigned char chksum_long[CSUM_LENGTH_PHASE2]; /* slow checksum */
};
+enum blkstatst {
+ BLKSTAT_NONE = 0,
+ BLKSTAT_DATASZ,
+ BLKSTAT_DATA,
+ BLKSTAT_TOK,
+ BLKSTAT_HASH,
+ BLKSTAT_DONE
+};
+
/*
* Information for the sender updating receiver blocks reentrantly.
*/
struct blkstat {
- off_t offs; /* position in sender file */
- off_t total; /* total amount processed */
- off_t dirty; /* total amount sent */
- size_t hint; /* optimisation: next probable block match */
- void *map; /* mapped file or MAP_FAILED otherwise */
- size_t mapsz; /* size of file or zero */
- int fd; /* descriptor girding the map */
+ off_t offs; /* position in sender file */
+ off_t total; /* total amount processed */
+ off_t dirty; /* total amount sent */
+ size_t hint; /* optimisation: next probable match */
+ void *map; /* mapped file or MAP_FAILED otherwise */
+ size_t mapsz; /* size of file or zero */
+ int fd; /* descriptor girding the map */
+ enum blkstatst curst; /* FSM for sending file blocks */
+ off_t curpos; /* sending: position in file to send */
+ off_t curlen; /* sending: length of send */
+ int32_t curtok; /* sending: next matching token or zero */
};
/*
@@ -182,6 +199,10 @@ struct ident {
struct download;
struct upload;
+#ifndef MIN
+# define MIN(_x1, _x2) ((_x1) < (_x2) ? (_x1) : (_x2))
+#endif
+
#define LOG0(_sess, _fmt, ...) \
rsync_log((_sess), __FILE__, __LINE__, -1, (_fmt), ##__VA_ARGS__)
#define LOG1(_sess, _fmt, ...) \
@@ -262,6 +283,13 @@ int io_write_int(struct sess *, int, int32_t);
int io_write_line(struct sess *, int, const char *);
int io_write_long(struct sess *, int, int64_t);
+int io_lowbuffer_alloc(struct sess *, void **,
+ size_t *, size_t *, size_t);
+void io_lowbuffer_int(struct sess *, void *,
+ size_t *, size_t, int32_t);
+void io_lowbuffer_buf(struct sess *, void *,
+ size_t *, size_t, const void *, size_t);
+
void io_buffer_int(struct sess *, void *,
size_t *, size_t, int32_t);
void io_buffer_buf(struct sess *, void *,
@@ -300,7 +328,7 @@ void upload_free(struct upload *);
struct blkset *blk_recv(struct sess *, int, const char *);
int blk_recv_ack(struct sess *,
int, const struct blkset *, int32_t);
-int blk_match(struct sess *, int, const struct blkset *,
+void blk_match(struct sess *, const struct blkset *,
const char *, struct blkstat *);
int blk_send(struct sess *, int, size_t,
const struct blkset *, const char *);
diff --git a/usr.bin/rsync/io.c b/usr.bin/rsync/io.c
index 7c4509df5b3..a8dce4d7d2a 100644
--- a/usr.bin/rsync/io.c
+++ b/usr.bin/rsync/io.c
@@ -1,4 +1,4 @@
-/* $Id: io.c,v 1.7 2019/02/16 16:58:14 florian Exp $ */
+/* $Id: io.c,v 1.8 2019/02/16 16:58:39 florian Exp $ */
/*
* Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
*
@@ -29,12 +29,6 @@
#include "extern.h"
/*
- * Use this for debugging deadlocks.
- * All poll events will use it and catch time-outs.
- */
-#define POLL_TIMEOUT (INFTIM)
-
-/*
* A non-blocking check to see whether there's POLLIN data in fd.
* Returns <0 on failure, 0 if there's no data, >0 if there is.
*/
diff --git a/usr.bin/rsync/sender.c b/usr.bin/rsync/sender.c
index 2bd1449a6b2..7f527f741f5 100644
--- a/usr.bin/rsync/sender.c
+++ b/usr.bin/rsync/sender.c
@@ -1,4 +1,4 @@
-/* $Id: sender.c,v 1.8 2019/02/16 16:57:17 florian Exp $ */
+/* $Id: sender.c,v 1.9 2019/02/16 16:58:39 florian Exp $ */
/*
* Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
*
@@ -26,6 +26,8 @@
#include <string.h>
#include <unistd.h>
+#include <openssl/md4.h>
+
#include "extern.h"
/*
@@ -84,6 +86,7 @@ send_up_reset(struct send_up *p)
p->stat.offs = 0;
p->stat.hint = 0;
+ p->stat.curst = BLKSTAT_NONE;
p->primed = 0;
}
@@ -172,14 +175,21 @@ int
rsync_sender(struct sess *sess, int fdin,
int fdout, size_t argc, char **argv)
{
- struct flist *fl = NULL;
- size_t i, flsz = 0, phase = 0, excl;
- int rc = 0, c;
- int32_t idx;
- struct pollfd pfd[3];
- struct send_dlq sdlq;
- struct send_up up;
- struct stat st;
+ struct flist *fl = NULL;
+ const struct flist *f;
+ size_t i, flsz = 0, phase = 0, excl;
+ off_t sz;
+ int rc = 0, c;
+ int32_t idx;
+ struct pollfd pfd[3];
+ struct send_dlq sdlq;
+ struct send_dl *dl;
+ struct send_up up;
+ struct stat st;
+ unsigned char filemd[MD4_DIGEST_LENGTH];
+ void *wbuf = NULL;
+ size_t wbufpos = 0, pos, wbufsz = 0, wbufmax = 0;
+ ssize_t ssz;
if (pledge("stdio getpw rpath unveil", NULL) == -1) {
ERR(sess, "pledge");
@@ -260,8 +270,6 @@ rsync_sender(struct sess *sess, int fdin,
pfd[2].fd = -1; /* from local file */
pfd[2].events = POLLIN;
- /* The main sender loop runs into phase == 2. */
-
for (;;) {
assert(pfd[0].fd != -1);
if ((c = poll(pfd, 3, POLL_TIMEOUT)) == -1) {
@@ -271,7 +279,6 @@ rsync_sender(struct sess *sess, int fdin,
ERRX(sess, "poll: timeout");
goto out;
}
-
for (i = 0; i < 3; i++)
if (pfd[i].revents & (POLLERR|POLLNVAL)) {
ERRX(sess, "poll: bad fd");
@@ -281,24 +288,13 @@ rsync_sender(struct sess *sess, int fdin,
goto out;
}
- /*
- * Flush out multiplexed messages.
- * These might otherwise clog the reader.
- */
-
- if (sess->mplex_reads &&
- (POLLIN & pfd[0].revents)) {
- if (!io_read_flush(sess, fdin)) {
- ERRX1(sess, "io_read_flush");
- goto out;
- } else if (sess->mplex_read_remain == 0)
- pfd[0].revents &= ~POLLIN;
- }
-
/*
* If we have a request coming down off the wire, pull
* it in as quickly as possible into our buffer.
* This unclogs the socket buffers so the data can flow.
+ * FIXME: if we're multiplexing, we might stall here if
+ * there's only a log message and no actual data.
+ * This can be fixed by doing a conditional test.
*/
if (pfd[0].revents & POLLIN)
@@ -321,8 +317,8 @@ rsync_sender(struct sess *sess, int fdin,
}
/*
- * One of our local files has been opened (in response
- * to a receiver request) and now we can map it.
+ * One of our local files has been opened in response
+ * to a receiver request and now we can map it.
* We'll respond to the event by looking at the map when
* the writer is available.
* Here we also enable the poll event for output.
@@ -333,70 +329,188 @@ rsync_sender(struct sess *sess, int fdin,
assert(up.stat.fd != -1);
assert(up.stat.map == MAP_FAILED);
assert(up.stat.mapsz == 0);
+ f = &fl[up.cur->idx];
if (fstat(up.stat.fd, &st) == -1) {
- ERR(sess, "%s: fstat", fl[up.cur->idx].path);
+ ERR(sess, "%s: fstat", f->path);
goto out;
}
/*
* If the file is zero-length, the map will
* fail, but either way we want to unset that
- * we're waiting for the file to open.
- * We'll close the descriptor after processing.
+ * we're waiting for the file to open and set
+ * that we're ready for the output channel.
*/
if ((up.stat.mapsz = st.st_size) > 0) {
- up.stat.map = mmap(NULL, up.stat.mapsz,
- PROT_READ, MAP_SHARED, up.stat.fd, 0);
+ up.stat.map = mmap(NULL,
+ up.stat.mapsz, PROT_READ,
+ MAP_SHARED, up.stat.fd, 0);
if (up.stat.map == MAP_FAILED) {
- ERR(sess, "%s: mmap", fl[up.cur->idx].path);
+ ERR(sess, "%s: mmap", f->path);
goto out;
}
- }
+ }
+
pfd[2].fd = -1;
pfd[1].fd = fdout;
}
/*
- * Our outbound is ready to process the current event.
- * This means we've already opened the file and possibly
- * mapped it, and we're ready to send blocks.
- * Do this one block at a time lest we block the channel
- * while read events are coming in.
+ * If we have buffers waiting to write, write them out
+ * as soon as we can in a non-blocking fashion.
+ * We must not be waiting for any local files.
+ * ALL WRITES MUST HAPPEN HERE.
+ * This keeps the sender deadlock-free.
*/
+ if ((pfd[1].revents & POLLOUT) && wbufsz > 0) {
+ assert(pfd[2].fd == -1);
+ assert(wbufsz - wbufpos);
+ ssz = write(fdout,
+ wbuf + wbufpos, wbufsz - wbufpos);
+ if (ssz < 0) {
+ ERR(sess, "write");
+ goto out;
+ }
+ wbufpos += ssz;
+ if (wbufpos == wbufsz)
+ wbufpos = wbufsz = 0;
+ pfd[1].revents &= ~POLLOUT;
+
+ /* This is usually in io.c... */
+
+ sess->total_write += ssz;
+ }
+
if (pfd[1].revents & POLLOUT) {
- assert(up.cur != NULL);
assert(pfd[2].fd == -1);
+ assert(0 == wbufpos && 0 == wbufsz);
/*
- * If we receive an invalid index (-1), then we're
- * either promoted to the second phase or it's time to
- * exit, depending upon which phase we're in.
+ * If we have data to write, do it now according
+ * to the data finite state machine.
+ * If we receive an invalid index (-1), then
+ * we're either promoted to the second phase or
+ * it's time to exit, depending upon which phase
+ * we're in.
* Otherwise, we either start a transfer
* sequence (if not primed) or continue one.
*/
- if (up.cur->idx < 0) {
- pfd[1].fd = -1;
+ pos = 0;
+ if (BLKSTAT_DATA == up.stat.curst) {
+ /*
+ * A data segment to be written: buffer
+ * both the length and the data, then
+ * put is in the token phase.
+ */
+
+ sz = MIN(MAX_CHUNK,
+ up.stat.curlen - up.stat.curpos);
+ if (!io_lowbuffer_alloc(sess, &wbuf,
+ &wbufsz, &wbufmax, sizeof(int32_t))) {
+ ERRX1(sess, "io_lowbuffer_alloc");
+ goto out;
+ }
+ io_lowbuffer_int(sess,
+ wbuf, &pos, wbufsz, sz);
+ if (!io_lowbuffer_alloc(sess, &wbuf,
+ &wbufsz, &wbufmax, sz)) {
+ ERRX1(sess, "io_lowbuffer_alloc");
+ goto out;
+ }
+ io_lowbuffer_buf(sess, wbuf, &pos, wbufsz,
+ up.stat.map + up.stat.curpos, sz);
+ up.stat.curpos += sz;
+ if (up.stat.curpos == up.stat.curlen)
+ up.stat.curst = BLKSTAT_TOK;
+ } else if (BLKSTAT_TOK == up.stat.curst) {
+ /*
+ * The data token following (maybe) a
+ * data segment.
+ * These can also come standalone if,
+ * say, the file's being fully written.
+ * It's followed by a hash or another
+ * data segment, depending on the token.
+ */
+
+ if (!io_lowbuffer_alloc(sess, &wbuf,
+ &wbufsz, &wbufmax, sizeof(int32_t))) {
+ ERRX1(sess, "io_lowbuffer_alloc");
+ goto out;
+ }
+ io_lowbuffer_int(sess, wbuf,
+ &pos, wbufsz, up.stat.curtok);
+ up.stat.curst = up.stat.curtok ?
+ BLKSTAT_NONE : BLKSTAT_HASH;
+ } else if (BLKSTAT_HASH == up.stat.curst) {
+ /*
+ * The hash following transmission of
+ * all file contents.
+ * This is always followed by the state
+ * that we're finished with the file.
+ */
+
+ hash_file(up.stat.map,
+ up.stat.mapsz, filemd, sess);
+ if (!io_lowbuffer_alloc(sess, &wbuf,
+ &wbufsz, &wbufmax, MD4_DIGEST_LENGTH)) {
+ ERRX1(sess, "io_lowbuffer_alloc");
+ goto out;
+ }
+ io_lowbuffer_buf(sess, wbuf, &pos,
+ wbufsz, filemd, MD4_DIGEST_LENGTH);
+ up.stat.curst = BLKSTAT_DONE;
+ } else if (BLKSTAT_DONE == up.stat.curst) {
+ /*
+ * The data has been written.
+ * Clear our current send file and allow
+ * the block below to find another.
+ */
+
+ LOG3(sess, "%s: flushed %jd KB total, "
+ "%.2f%% uploaded",
+ fl[up.cur->idx].path,
+ (intmax_t)up.stat.total / 1024,
+ 100.0 * up.stat.dirty / up.stat.total);
send_up_reset(&up);
+ } else if (NULL != up.cur && up.cur->idx < 0) {
+ /*
+ * We've hit the phase change following
+ * the last file (or start, or prior
+ * phase change).
+ * Simply acknowledge it.
+ * FIXME: use buffering.
+ */
+
if (!io_write_int(sess, fdout, -1)) {
ERRX1(sess, "io_write_int");
goto out;
}
-
- /* Send superfluous ack. */
-
if (sess->opts->server && sess->rver > 27 &&
!io_write_int(sess, fdout, -1)) {
ERRX1(sess, "io_write_int");
goto out;
}
+ send_up_reset(&up);
+
+ /*
+ * This is where we actually stop the
+ * algorithm: we're already at the
+ * second phase.
+ */
if (phase++)
break;
- } else if (0 == up.primed) {
+ } else if (NULL != up.cur && 0 == up.primed) {
+ /*
+ * We're getting ready to send the file
+ * contents to the receiver.
+ * FIXME: use buffering.
+ */
+
if (!sess->opts->server)
LOG1(sess, "%s", fl[up.cur->idx].wpath);
@@ -416,18 +530,23 @@ rsync_sender(struct sess *sess, int fdin,
ERRX1(sess, "blk_recv_ack");
goto out;
}
+ LOG3(sess, "%s: primed for %jd B total",
+ fl[up.cur->idx].path,
+ (intmax_t)up.cur->blks->size);
up.primed = 1;
- } else {
+ } else if (NULL != up.cur) {
+ /*
+ * Our last case: we need to find the
+ * next block (and token) to transmit to
+ * the receiver.
+ * These will drive the finite state
+ * machine in the first few conditional
+ * blocks of this set.
+ */
+
assert(up.stat.fd != -1);
- c = blk_match(sess, fdout, up.cur->blks,
+ blk_match(sess, up.cur->blks,
fl[up.cur->idx].path, &up.stat);
- if (c < 0) {
- ERRX1(sess, "blk_match");
- goto out;
- } else if (c > 0) {
- send_up_reset(&up);
- pfd[1].fd = -1;
- }
}
}
@@ -443,21 +562,38 @@ rsync_sender(struct sess *sess, int fdin,
assert(up.stat.fd == -1);
assert(up.stat.map == MAP_FAILED);
assert(up.stat.mapsz == 0);
+ assert(wbufsz == 0 && wbufpos == 0);
+ pfd[1].fd = -1;
+
+ /*
+ * If there's nothing in the queue, then keep
+ * the output channel disabled and wait for
+ * whatever comes next from the reader.
+ */
if ((up.cur = TAILQ_FIRST(&sdlq)) == NULL)
continue;
+
TAILQ_REMOVE(&sdlq, up.cur, entries);
- /* End of phase: enable channel to receiver. */
+ /*
+ * End of phase: enable channel to receiver.
+ * We'll need our output buffer enabled in order
+ * to process this event.
+ */
if (up.cur->idx == -1) {
pfd[1].fd = fdout;
continue;
}
- /* Non-blocking open of file. */
+ /*
+ * Non-blocking open of file.
+ * This will be picked up in the state machine
+ * block of not being primed.
+ */
- up.stat.fd = open(fl[up.cur->idx].path,
+ up.stat.fd = open(fl[up.cur->idx].path,
O_RDONLY|O_NONBLOCK, 0);
if (up.stat.fd == -1) {
ERR(sess, "%s: open", fl[up.cur->idx].path);
@@ -467,6 +603,11 @@ rsync_sender(struct sess *sess, int fdin,
}
}
+ if (!TAILQ_EMPTY(&sdlq)) {
+ ERRX(sess, "phases complete with files still queued");
+ goto out;
+ }
+
if (!sess_stats_send(sess, fdout)) {
ERRX1(sess, "sess_stats_end");
goto out;
@@ -485,6 +626,12 @@ rsync_sender(struct sess *sess, int fdin,
LOG2(sess, "sender finished updating");
rc = 1;
out:
+ send_up_reset(&up);
+ while ((dl = TAILQ_FIRST(&sdlq)) != NULL) {
+ free(dl->blks);
+ free(dl);
+ }
flist_free(fl, flsz);
+ free(wbuf);
return rc;
}