diff options
author | Florian Obser <florian@cvs.openbsd.org> | 2019-02-16 16:58:40 +0000 |
---|---|---|
committer | Florian Obser <florian@cvs.openbsd.org> | 2019-02-16 16:58:40 +0000 |
commit | 61713811184ecd5d663058b78ad75321ae689eaf (patch) | |
tree | f2d2bd40f77c0237311cfbe3c9a08d2c59f64c7b | |
parent | 9f162b46faa21e0ac99e0dde04178f5f7d0c1cbb (diff) |
sync with kristaps, commit 1842d31d600f2834ae78fe0d99a29519a853d75c
Make sender mostly nonblocking for writes. This takes a lot of logic once
in blocks.c and puts it directly into sender.c. It allows running openrsync
against itself without deadlocks.
-rw-r--r-- | usr.bin/rsync/blocks.c | 206 | ||||
-rw-r--r-- | usr.bin/rsync/extern.h | 48 | ||||
-rw-r--r-- | usr.bin/rsync/io.c | 8 | ||||
-rw-r--r-- | usr.bin/rsync/sender.c | 271 |
4 files changed, 312 insertions, 221 deletions
diff --git a/usr.bin/rsync/blocks.c b/usr.bin/rsync/blocks.c index 2849945d9c0..5ba80469634 100644 --- a/usr.bin/rsync/blocks.c +++ b/usr.bin/rsync/blocks.c @@ -1,4 +1,4 @@ -/* $Id: blocks.c,v 1.8 2019/02/16 16:57:48 florian Exp $ */ +/* $Id: blocks.c,v 1.9 2019/02/16 16:58:39 florian Exp $ */ /* * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv> * @@ -30,38 +30,6 @@ #include "extern.h" /* - * Flush out "size" bytes of the buffer, doing all of the appropriate - * chunking of the data, then the subsequent token (or zero). - * Return zero on failure, non-zero on success. - */ -static int -blk_flush(struct sess *sess, int fd, - const void *b, off_t size, int32_t token) -{ - off_t i = 0, sz; - - while (i < size) { - sz = MAX_CHUNK < (size - i) ? - MAX_CHUNK : (size - i); - if (!io_write_int(sess, fd, sz)) { - ERRX1(sess, "io_write_int"); - return 0; - } else if (!io_write_buf(sess, fd, b + i, sz)) { - ERRX1(sess, "io_write_buf"); - return 0; - } - i += sz; - } - - if (!io_write_int(sess, fd, token)) { - ERRX1(sess, "io_write_int"); - return 0; - } - - return 1; -} - -/* * From our current position of "offs" in buffer "buf" of total size * "size", see if we can find a matching block in our list of blocks. * The "hint" refers to the block that *might* work. @@ -146,99 +114,18 @@ blk_find(struct sess *sess, const void *buf, off_t size, off_t offs, } /* - * The main reconstruction algorithm on the sender side. - * This is reentrant: it's meant to be called whenever "fd" unblocks for - * writing by the sender. - * Scans byte-wise over the input file, looking for matching blocks in - * what the server sent us. - * If a block is found, emit all data up until the block, then the token - * for the block. - * The receiving end can then reconstruct the file trivially. - * Return zero on failure, non-zero on success. - */ -static int -blk_match_send(struct sess *sess, const char *path, int fd, - const struct blkset *blks, struct blkstat *st) -{ - off_t last, end, sz; - int32_t tok; - struct blk *blk; - - /* - * Stop searching at the length of the file minus the size of - * the last block. - * The reason for this being that we don't need to do an - * incremental hash within the last block---if it doesn't match, - * it doesn't match. - */ - - end = st->mapsz + 1 - blks->blks[blks->blksz - 1].len; - last = st->offs; - - for ( ; st->offs < end; st->offs++) { - blk = blk_find(sess, st->map, st->mapsz, - st->offs, blks, path, st->hint); - if (blk == NULL) - continue; - - sz = st->offs - last; - st->dirty += sz; - st->total += sz; - LOG4(sess, "%s: flushing %jd B before %zu B " - "block %zu", path, (intmax_t)sz, blk->len, blk->idx); - tok = -(blk->idx + 1); - - /* - * Write the data we have, then follow it with the tag - * of the block that matches. - * The receiver will then write our data, then the data - * it already has in the matching block. - */ - - if (!blk_flush(sess, fd, st->map + last, sz, tok)) { - ERRX1(sess, "blk_flush"); - return -1; - } - - st->total += blk->len; - st->offs += blk->len; - st->hint = blk->idx + 1; - return 0; - } - - /* Emit remaining data and send terminator token. */ - - sz = st->mapsz - last; - st->total += sz; - st->dirty += sz; - - LOG4(sess, "%s: flushing remaining %jd B", path, (intmax_t)sz); - - if (!blk_flush(sess, fd, st->map + last, sz, 0)) { - ERRX1(sess, "blk_flush"); - return -1; - } - - LOG3(sess, "%s: flushed (chunked) %jd B total, " - "%.2f%% upload ratio", path, (intmax_t)st->total, - 100.0 * st->dirty / st->total); - return 1; -} - -/* * Given a local file "path" and the blocks created by a remote machine, * find out which blocks of our file they don't have and send them. * This function is reentrant: it must be called while there's still * data to send. - * Return 0 if there's more data to send, >0 if the file has completed - * its update, or <0 on error. */ -int -blk_match(struct sess *sess, int fd, const struct blkset *blks, +void +blk_match(struct sess *sess, const struct blkset *blks, const char *path, struct blkstat *st) { - unsigned char filemd[MD4_DIGEST_LENGTH]; - int c; + off_t last, end, sz; + int32_t tok; + struct blk *blk; /* * If the file's empty or we don't have any blocks from the @@ -248,34 +135,69 @@ blk_match(struct sess *sess, int fd, const struct blkset *blks, */ if (st->mapsz && blks->blksz) { - if ((c = blk_match_send(sess, path, fd, blks, st)) < 0) { - ERRX1(sess, "blk_match_send"); - return -1; - } else if (c == 0) - return 0; - } else { - if (!blk_flush(sess, fd, st->map, st->mapsz, 0)) { - ERRX1(sess, "blk_flush"); - return -1; + /* + * Stop searching at the length of the file minus the + * size of the last block. + * The reason for this being that we don't need to do an + * incremental hash within the last block---if it + * doesn't match, it doesn't match. + */ + + end = st->mapsz + 1 - blks->blks[blks->blksz - 1].len; + last = st->offs; + + for ( ; st->offs < end; st->offs++) { + blk = blk_find(sess, st->map, st->mapsz, + st->offs, blks, path, st->hint); + if (blk == NULL) + continue; + + sz = st->offs - last; + st->dirty += sz; + st->total += sz; + LOG4(sess, "%s: flushing %jd B before %zu B " + "block %zu", path, (intmax_t)sz, + blk->len, blk->idx); + tok = -(blk->idx + 1); + + /* + * Write the data we have, then follow it with + * the tag of the block that matches. + */ + + st->curpos = last; + st->curlen = st->curpos + sz; + st->curtok = tok; + assert(0 != st->curtok); + st->curst = sz ? BLKSTAT_DATA : BLKSTAT_TOK; + st->total += blk->len; + st->offs += blk->len; + st->hint = blk->idx + 1; + return; } - LOG3(sess, "%s: flushed (un-chunked) %jd B, 100%% upload ratio", - path, (intmax_t)st->mapsz); - } - /* - * Now write the full file hash. - * Since we're seeding the hash, this always gives us some sort - * of data even if the file's zero-length. - */ + /* Emit remaining data and send terminator token. */ - hash_file(st->map, st->mapsz, filemd, sess); + sz = st->mapsz - last; + LOG4(sess, "%s: flushing remaining %jd B", + path, (intmax_t)sz); - if (!io_write_buf(sess, fd, filemd, MD4_DIGEST_LENGTH)) { - ERRX1(sess, "io_write_buf"); - return -1; + st->total += sz; + st->dirty += sz; + st->curpos = last; + st->curlen = st->curpos + sz; + st->curtok = 0; + st->curst = sz ? BLKSTAT_DATA : BLKSTAT_TOK; + } else { + st->curpos = 0; + st->curlen = st->mapsz; + st->curtok = 0; + st->curst = st->mapsz ? BLKSTAT_DATA : BLKSTAT_TOK; + st->dirty = st->total = st->mapsz; + + LOG4(sess, "%s: flushing whole file %zu B", + path, st->mapsz); } - - return 1; } /* diff --git a/usr.bin/rsync/extern.h b/usr.bin/rsync/extern.h index 2fb34629fa7..c8cf4012c26 100644 --- a/usr.bin/rsync/extern.h +++ b/usr.bin/rsync/extern.h @@ -1,4 +1,4 @@ -/* $Id: extern.h,v 1.13 2019/02/16 16:57:17 florian Exp $ */ +/* $Id: extern.h,v 1.14 2019/02/16 16:58:39 florian Exp $ */ /* * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv> * @@ -41,7 +41,11 @@ #define CSUM_LENGTH_PHASE1 (2) #define CSUM_LENGTH_PHASE2 (16) -#define POLL_TIMEOUT (10000) +/* + * Use this for debugging deadlocks. + * All poll events will use it and catch time-outs. + */ +#define POLL_TIMEOUT (INFTIM) /* * Operating mode for a client or a server. @@ -128,17 +132,30 @@ struct blk { unsigned char chksum_long[CSUM_LENGTH_PHASE2]; /* slow checksum */ }; +enum blkstatst { + BLKSTAT_NONE = 0, + BLKSTAT_DATASZ, + BLKSTAT_DATA, + BLKSTAT_TOK, + BLKSTAT_HASH, + BLKSTAT_DONE +}; + /* * Information for the sender updating receiver blocks reentrantly. */ struct blkstat { - off_t offs; /* position in sender file */ - off_t total; /* total amount processed */ - off_t dirty; /* total amount sent */ - size_t hint; /* optimisation: next probable block match */ - void *map; /* mapped file or MAP_FAILED otherwise */ - size_t mapsz; /* size of file or zero */ - int fd; /* descriptor girding the map */ + off_t offs; /* position in sender file */ + off_t total; /* total amount processed */ + off_t dirty; /* total amount sent */ + size_t hint; /* optimisation: next probable match */ + void *map; /* mapped file or MAP_FAILED otherwise */ + size_t mapsz; /* size of file or zero */ + int fd; /* descriptor girding the map */ + enum blkstatst curst; /* FSM for sending file blocks */ + off_t curpos; /* sending: position in file to send */ + off_t curlen; /* sending: length of send */ + int32_t curtok; /* sending: next matching token or zero */ }; /* @@ -182,6 +199,10 @@ struct ident { struct download; struct upload; +#ifndef MIN +# define MIN(_x1, _x2) ((_x1) < (_x2) ? (_x1) : (_x2)) +#endif + #define LOG0(_sess, _fmt, ...) \ rsync_log((_sess), __FILE__, __LINE__, -1, (_fmt), ##__VA_ARGS__) #define LOG1(_sess, _fmt, ...) \ @@ -262,6 +283,13 @@ int io_write_int(struct sess *, int, int32_t); int io_write_line(struct sess *, int, const char *); int io_write_long(struct sess *, int, int64_t); +int io_lowbuffer_alloc(struct sess *, void **, + size_t *, size_t *, size_t); +void io_lowbuffer_int(struct sess *, void *, + size_t *, size_t, int32_t); +void io_lowbuffer_buf(struct sess *, void *, + size_t *, size_t, const void *, size_t); + void io_buffer_int(struct sess *, void *, size_t *, size_t, int32_t); void io_buffer_buf(struct sess *, void *, @@ -300,7 +328,7 @@ void upload_free(struct upload *); struct blkset *blk_recv(struct sess *, int, const char *); int blk_recv_ack(struct sess *, int, const struct blkset *, int32_t); -int blk_match(struct sess *, int, const struct blkset *, +void blk_match(struct sess *, const struct blkset *, const char *, struct blkstat *); int blk_send(struct sess *, int, size_t, const struct blkset *, const char *); diff --git a/usr.bin/rsync/io.c b/usr.bin/rsync/io.c index 7c4509df5b3..a8dce4d7d2a 100644 --- a/usr.bin/rsync/io.c +++ b/usr.bin/rsync/io.c @@ -1,4 +1,4 @@ -/* $Id: io.c,v 1.7 2019/02/16 16:58:14 florian Exp $ */ +/* $Id: io.c,v 1.8 2019/02/16 16:58:39 florian Exp $ */ /* * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv> * @@ -29,12 +29,6 @@ #include "extern.h" /* - * Use this for debugging deadlocks. - * All poll events will use it and catch time-outs. - */ -#define POLL_TIMEOUT (INFTIM) - -/* * A non-blocking check to see whether there's POLLIN data in fd. * Returns <0 on failure, 0 if there's no data, >0 if there is. */ diff --git a/usr.bin/rsync/sender.c b/usr.bin/rsync/sender.c index 2bd1449a6b2..7f527f741f5 100644 --- a/usr.bin/rsync/sender.c +++ b/usr.bin/rsync/sender.c @@ -1,4 +1,4 @@ -/* $Id: sender.c,v 1.8 2019/02/16 16:57:17 florian Exp $ */ +/* $Id: sender.c,v 1.9 2019/02/16 16:58:39 florian Exp $ */ /* * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv> * @@ -26,6 +26,8 @@ #include <string.h> #include <unistd.h> +#include <openssl/md4.h> + #include "extern.h" /* @@ -84,6 +86,7 @@ send_up_reset(struct send_up *p) p->stat.offs = 0; p->stat.hint = 0; + p->stat.curst = BLKSTAT_NONE; p->primed = 0; } @@ -172,14 +175,21 @@ int rsync_sender(struct sess *sess, int fdin, int fdout, size_t argc, char **argv) { - struct flist *fl = NULL; - size_t i, flsz = 0, phase = 0, excl; - int rc = 0, c; - int32_t idx; - struct pollfd pfd[3]; - struct send_dlq sdlq; - struct send_up up; - struct stat st; + struct flist *fl = NULL; + const struct flist *f; + size_t i, flsz = 0, phase = 0, excl; + off_t sz; + int rc = 0, c; + int32_t idx; + struct pollfd pfd[3]; + struct send_dlq sdlq; + struct send_dl *dl; + struct send_up up; + struct stat st; + unsigned char filemd[MD4_DIGEST_LENGTH]; + void *wbuf = NULL; + size_t wbufpos = 0, pos, wbufsz = 0, wbufmax = 0; + ssize_t ssz; if (pledge("stdio getpw rpath unveil", NULL) == -1) { ERR(sess, "pledge"); @@ -260,8 +270,6 @@ rsync_sender(struct sess *sess, int fdin, pfd[2].fd = -1; /* from local file */ pfd[2].events = POLLIN; - /* The main sender loop runs into phase == 2. */ - for (;;) { assert(pfd[0].fd != -1); if ((c = poll(pfd, 3, POLL_TIMEOUT)) == -1) { @@ -271,7 +279,6 @@ rsync_sender(struct sess *sess, int fdin, ERRX(sess, "poll: timeout"); goto out; } - for (i = 0; i < 3; i++) if (pfd[i].revents & (POLLERR|POLLNVAL)) { ERRX(sess, "poll: bad fd"); @@ -281,24 +288,13 @@ rsync_sender(struct sess *sess, int fdin, goto out; } - /* - * Flush out multiplexed messages. - * These might otherwise clog the reader. - */ - - if (sess->mplex_reads && - (POLLIN & pfd[0].revents)) { - if (!io_read_flush(sess, fdin)) { - ERRX1(sess, "io_read_flush"); - goto out; - } else if (sess->mplex_read_remain == 0) - pfd[0].revents &= ~POLLIN; - } - /* * If we have a request coming down off the wire, pull * it in as quickly as possible into our buffer. * This unclogs the socket buffers so the data can flow. + * FIXME: if we're multiplexing, we might stall here if + * there's only a log message and no actual data. + * This can be fixed by doing a conditional test. */ if (pfd[0].revents & POLLIN) @@ -321,8 +317,8 @@ rsync_sender(struct sess *sess, int fdin, } /* - * One of our local files has been opened (in response - * to a receiver request) and now we can map it. + * One of our local files has been opened in response + * to a receiver request and now we can map it. * We'll respond to the event by looking at the map when * the writer is available. * Here we also enable the poll event for output. @@ -333,70 +329,188 @@ rsync_sender(struct sess *sess, int fdin, assert(up.stat.fd != -1); assert(up.stat.map == MAP_FAILED); assert(up.stat.mapsz == 0); + f = &fl[up.cur->idx]; if (fstat(up.stat.fd, &st) == -1) { - ERR(sess, "%s: fstat", fl[up.cur->idx].path); + ERR(sess, "%s: fstat", f->path); goto out; } /* * If the file is zero-length, the map will * fail, but either way we want to unset that - * we're waiting for the file to open. - * We'll close the descriptor after processing. + * we're waiting for the file to open and set + * that we're ready for the output channel. */ if ((up.stat.mapsz = st.st_size) > 0) { - up.stat.map = mmap(NULL, up.stat.mapsz, - PROT_READ, MAP_SHARED, up.stat.fd, 0); + up.stat.map = mmap(NULL, + up.stat.mapsz, PROT_READ, + MAP_SHARED, up.stat.fd, 0); if (up.stat.map == MAP_FAILED) { - ERR(sess, "%s: mmap", fl[up.cur->idx].path); + ERR(sess, "%s: mmap", f->path); goto out; } - } + } + pfd[2].fd = -1; pfd[1].fd = fdout; } /* - * Our outbound is ready to process the current event. - * This means we've already opened the file and possibly - * mapped it, and we're ready to send blocks. - * Do this one block at a time lest we block the channel - * while read events are coming in. + * If we have buffers waiting to write, write them out + * as soon as we can in a non-blocking fashion. + * We must not be waiting for any local files. + * ALL WRITES MUST HAPPEN HERE. + * This keeps the sender deadlock-free. */ + if ((pfd[1].revents & POLLOUT) && wbufsz > 0) { + assert(pfd[2].fd == -1); + assert(wbufsz - wbufpos); + ssz = write(fdout, + wbuf + wbufpos, wbufsz - wbufpos); + if (ssz < 0) { + ERR(sess, "write"); + goto out; + } + wbufpos += ssz; + if (wbufpos == wbufsz) + wbufpos = wbufsz = 0; + pfd[1].revents &= ~POLLOUT; + + /* This is usually in io.c... */ + + sess->total_write += ssz; + } + if (pfd[1].revents & POLLOUT) { - assert(up.cur != NULL); assert(pfd[2].fd == -1); + assert(0 == wbufpos && 0 == wbufsz); /* - * If we receive an invalid index (-1), then we're - * either promoted to the second phase or it's time to - * exit, depending upon which phase we're in. + * If we have data to write, do it now according + * to the data finite state machine. + * If we receive an invalid index (-1), then + * we're either promoted to the second phase or + * it's time to exit, depending upon which phase + * we're in. * Otherwise, we either start a transfer * sequence (if not primed) or continue one. */ - if (up.cur->idx < 0) { - pfd[1].fd = -1; + pos = 0; + if (BLKSTAT_DATA == up.stat.curst) { + /* + * A data segment to be written: buffer + * both the length and the data, then + * put is in the token phase. + */ + + sz = MIN(MAX_CHUNK, + up.stat.curlen - up.stat.curpos); + if (!io_lowbuffer_alloc(sess, &wbuf, + &wbufsz, &wbufmax, sizeof(int32_t))) { + ERRX1(sess, "io_lowbuffer_alloc"); + goto out; + } + io_lowbuffer_int(sess, + wbuf, &pos, wbufsz, sz); + if (!io_lowbuffer_alloc(sess, &wbuf, + &wbufsz, &wbufmax, sz)) { + ERRX1(sess, "io_lowbuffer_alloc"); + goto out; + } + io_lowbuffer_buf(sess, wbuf, &pos, wbufsz, + up.stat.map + up.stat.curpos, sz); + up.stat.curpos += sz; + if (up.stat.curpos == up.stat.curlen) + up.stat.curst = BLKSTAT_TOK; + } else if (BLKSTAT_TOK == up.stat.curst) { + /* + * The data token following (maybe) a + * data segment. + * These can also come standalone if, + * say, the file's being fully written. + * It's followed by a hash or another + * data segment, depending on the token. + */ + + if (!io_lowbuffer_alloc(sess, &wbuf, + &wbufsz, &wbufmax, sizeof(int32_t))) { + ERRX1(sess, "io_lowbuffer_alloc"); + goto out; + } + io_lowbuffer_int(sess, wbuf, + &pos, wbufsz, up.stat.curtok); + up.stat.curst = up.stat.curtok ? + BLKSTAT_NONE : BLKSTAT_HASH; + } else if (BLKSTAT_HASH == up.stat.curst) { + /* + * The hash following transmission of + * all file contents. + * This is always followed by the state + * that we're finished with the file. + */ + + hash_file(up.stat.map, + up.stat.mapsz, filemd, sess); + if (!io_lowbuffer_alloc(sess, &wbuf, + &wbufsz, &wbufmax, MD4_DIGEST_LENGTH)) { + ERRX1(sess, "io_lowbuffer_alloc"); + goto out; + } + io_lowbuffer_buf(sess, wbuf, &pos, + wbufsz, filemd, MD4_DIGEST_LENGTH); + up.stat.curst = BLKSTAT_DONE; + } else if (BLKSTAT_DONE == up.stat.curst) { + /* + * The data has been written. + * Clear our current send file and allow + * the block below to find another. + */ + + LOG3(sess, "%s: flushed %jd KB total, " + "%.2f%% uploaded", + fl[up.cur->idx].path, + (intmax_t)up.stat.total / 1024, + 100.0 * up.stat.dirty / up.stat.total); send_up_reset(&up); + } else if (NULL != up.cur && up.cur->idx < 0) { + /* + * We've hit the phase change following + * the last file (or start, or prior + * phase change). + * Simply acknowledge it. + * FIXME: use buffering. + */ + if (!io_write_int(sess, fdout, -1)) { ERRX1(sess, "io_write_int"); goto out; } - - /* Send superfluous ack. */ - if (sess->opts->server && sess->rver > 27 && !io_write_int(sess, fdout, -1)) { ERRX1(sess, "io_write_int"); goto out; } + send_up_reset(&up); + + /* + * This is where we actually stop the + * algorithm: we're already at the + * second phase. + */ if (phase++) break; - } else if (0 == up.primed) { + } else if (NULL != up.cur && 0 == up.primed) { + /* + * We're getting ready to send the file + * contents to the receiver. + * FIXME: use buffering. + */ + if (!sess->opts->server) LOG1(sess, "%s", fl[up.cur->idx].wpath); @@ -416,18 +530,23 @@ rsync_sender(struct sess *sess, int fdin, ERRX1(sess, "blk_recv_ack"); goto out; } + LOG3(sess, "%s: primed for %jd B total", + fl[up.cur->idx].path, + (intmax_t)up.cur->blks->size); up.primed = 1; - } else { + } else if (NULL != up.cur) { + /* + * Our last case: we need to find the + * next block (and token) to transmit to + * the receiver. + * These will drive the finite state + * machine in the first few conditional + * blocks of this set. + */ + assert(up.stat.fd != -1); - c = blk_match(sess, fdout, up.cur->blks, + blk_match(sess, up.cur->blks, fl[up.cur->idx].path, &up.stat); - if (c < 0) { - ERRX1(sess, "blk_match"); - goto out; - } else if (c > 0) { - send_up_reset(&up); - pfd[1].fd = -1; - } } } @@ -443,21 +562,38 @@ rsync_sender(struct sess *sess, int fdin, assert(up.stat.fd == -1); assert(up.stat.map == MAP_FAILED); assert(up.stat.mapsz == 0); + assert(wbufsz == 0 && wbufpos == 0); + pfd[1].fd = -1; + + /* + * If there's nothing in the queue, then keep + * the output channel disabled and wait for + * whatever comes next from the reader. + */ if ((up.cur = TAILQ_FIRST(&sdlq)) == NULL) continue; + TAILQ_REMOVE(&sdlq, up.cur, entries); - /* End of phase: enable channel to receiver. */ + /* + * End of phase: enable channel to receiver. + * We'll need our output buffer enabled in order + * to process this event. + */ if (up.cur->idx == -1) { pfd[1].fd = fdout; continue; } - /* Non-blocking open of file. */ + /* + * Non-blocking open of file. + * This will be picked up in the state machine + * block of not being primed. + */ - up.stat.fd = open(fl[up.cur->idx].path, + up.stat.fd = open(fl[up.cur->idx].path, O_RDONLY|O_NONBLOCK, 0); if (up.stat.fd == -1) { ERR(sess, "%s: open", fl[up.cur->idx].path); @@ -467,6 +603,11 @@ rsync_sender(struct sess *sess, int fdin, } } + if (!TAILQ_EMPTY(&sdlq)) { + ERRX(sess, "phases complete with files still queued"); + goto out; + } + if (!sess_stats_send(sess, fdout)) { ERRX1(sess, "sess_stats_end"); goto out; @@ -485,6 +626,12 @@ rsync_sender(struct sess *sess, int fdin, LOG2(sess, "sender finished updating"); rc = 1; out: + send_up_reset(&up); + while ((dl = TAILQ_FIRST(&sdlq)) != NULL) { + free(dl->blks); + free(dl); + } flist_free(fl, flsz); + free(wbuf); return rc; } |