diff options
Diffstat (limited to 'usr.bin/aucat/sock.c')
-rw-r--r-- | usr.bin/aucat/sock.c | 931 |
1 files changed, 931 insertions, 0 deletions
diff --git a/usr.bin/aucat/sock.c b/usr.bin/aucat/sock.c new file mode 100644 index 00000000000..7c9aecce3ea --- /dev/null +++ b/usr.bin/aucat/sock.c @@ -0,0 +1,931 @@ +/* $OpenBSD: sock.c,v 1.1 2008/10/26 08:49:44 ratchov Exp $ */ +/* + * Copyright (c) 2008 Alexandre Ratchov <alex@caoua.org> + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ +/* + * TODO: + * + * change f->bufsz to contain only socket-side buffer, + * because it's less error prone + */ + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include "aproc.h" +#include "abuf.h" +#include "sock.h" +#include "dev.h" +#include "conf.h" + +int sock_attach(struct sock *, int); +int sock_read(struct sock *); +int sock_write(struct sock *); +int sock_execmsg(struct sock *); +void sock_reset(struct sock *); + +struct fileops sock_ops = { + "sock", + sizeof(struct sock), + pipe_close, + pipe_read, + pipe_write, + NULL, /* start */ + NULL, /* stop */ + pipe_nfds, + pipe_pollfd, + pipe_revents +}; + +void +rsock_done(struct aproc *p) +{ + struct sock *f = (struct sock *)p->u.io.file; + + DPRINTFN(1, "rsock_done: %p\n", f); + sock_reset(f); + f->pipe.file.rproc = NULL; + if (f->pipe.file.wproc) { + aproc_del(f->pipe.file.wproc); + file_del(&f->pipe.file); + } +} + +int +rsock_in(struct aproc *p, struct abuf *ibuf_dummy) +{ + struct sock *f = (struct sock *)p->u.io.file; + struct abuf *obuf; + + DPRINTFN(4, "rsock_in: %p\n", f); + + if (!sock_read(f)) + return 0; + obuf = LIST_FIRST(&p->obuflist); + if (obuf) { + if (!abuf_flush(obuf)) + return 0; + } + return 1; +} + +int +rsock_out(struct aproc *p, struct abuf *obuf) +{ + struct sock *f = (struct sock *)p->u.io.file; + + if (f->pipe.file.refs > 0) + return 0; + + DPRINTFN(4, "rsock_out: %p\n", f); + + /* + * when calling sock_read(), we may receive a ``STOP'' command, + * and detach ``obuf''. In this case, there's no more caller and + * we'll stop processing further messages, resulting in a dead lock. + * The solution is to iterate over sock_read() in order to + * consume all messages(). + */ + for (;;) { + if (!sock_read(f)) + return 0; + } + return 1; +} + +void +rsock_eof(struct aproc *p, struct abuf *ibuf_dummy) +{ + DPRINTFN(3, "rsock_eof: %p\n", p->u.io.file); + aproc_del(p); +} + +void +rsock_hup(struct aproc *p, struct abuf *ibuf) +{ + DPRINTFN(3, "rsock_hup: %p\n", p->u.io.file); + aproc_del(p); +} + +void +rsock_opos(struct aproc *p, struct abuf *obuf, int delta) +{ + struct sock *f = (struct sock *)p->u.io.file; + + f->odelta += delta; + DPRINTFN(3, "rsock_opos: %p: delta = %d, odelta = %d\n", + f, delta, f->odelta); + + /* + * negative deltas are xrun notifications for internal uses + * only. Dont generate a packet for this, the client will be + * notified later. + */ + if (delta <= 0) + return; + for (;;) { + if (!sock_write(f)) + break; + } +} + +struct aproc_ops rsock_ops = { + "rsock", + rsock_in, + rsock_out, + rsock_eof, + rsock_hup, + NULL, /* newin */ + NULL, /* newout */ + NULL, /* ipos */ + rsock_opos, + rsock_done +}; + +void +wsock_done(struct aproc *p) +{ + struct sock *f = (struct sock *)p->u.io.file; + + DPRINTFN(1, "wsock_done: %p\n", f); + sock_reset(f); + f->pipe.file.wproc = NULL; + if (f->pipe.file.rproc) { + aproc_del(f->pipe.file.rproc); + file_del(&f->pipe.file); + } +} + +int +wsock_in(struct aproc *p, struct abuf *ibuf) +{ + struct sock *f = (struct sock *)p->u.io.file; + + if (f->pipe.file.refs > 0) + return 0; + + DPRINTFN(4, "wsock_in: %p\n", f); + + /* + * see remark in rsock_out() + */ + for (;;) { + if (!sock_write(f)) + return 0; + } + return 1; +} + +int +wsock_out(struct aproc *p, struct abuf *obuf_dummy) +{ + struct abuf *ibuf = LIST_FIRST(&p->ibuflist); + struct sock *f = (struct sock *)p->u.io.file; + + DPRINTFN(3, "wsock_out: %p\n", f); + + if (ibuf) { + DPRINTFN(3, "wsock_out: %p, filling ibuf\n", f); + if (!abuf_fill(ibuf)) + return 0; + } + if (!sock_write(f)) + return 0; + return 1; +} + +void +wsock_eof(struct aproc *p, struct abuf *obuf) +{ + DPRINTFN(3, "wsock_eof: %p\n", p->u.io.file); + aproc_del(p); +} + +void +wsock_hup(struct aproc *p, struct abuf *obuf_dummy) +{ + DPRINTFN(3, "wsock_hup: %p\n", p->u.io.file); + aproc_del(p); +} + +void +wsock_ipos(struct aproc *p, struct abuf *obuf, int delta) +{ + struct sock *f = (struct sock *)p->u.io.file; + + f->idelta += delta; + DPRINTFN(3, "wsock_ipos: %p, delta = %d, odelta = %d\n", + f, delta, f->idelta); + + /* + * negative deltas are xrun notifications for internal uses + * only. Dont generate a packet for this, the client will be + * notified later. + */ + if (delta <= 0) + return; + for (;;) { + if (!sock_write(f)) + break; + } +} + +struct aproc_ops wsock_ops = { + "wsock", + wsock_in, + wsock_out, + wsock_eof, + wsock_hup, + NULL, /* newin */ + NULL, /* newout */ + wsock_ipos, + NULL, /* opos */ + wsock_done +}; + +/* + * initialise socket in the SOCK_INIT state with default + * parameters + */ +struct sock * +sock_new(struct fileops *ops, int fd, char *name) +{ + struct aproc *rproc, *wproc; + struct sock *f; + + f = (struct sock *)pipe_new(ops, fd, name); + f->pstate = SOCK_INIT; + f->mode = 0; + if (dev_rec) { + f->wpar = dev_ipar; + f->mode |= AMSG_REC; + } + if (dev_play) { + f->rpar = dev_opar; + f->mode |= AMSG_PLAY; + } + f->xrun = AMSG_IGNORE; + f->bufsz = 2 * dev_bufsz; + f->round = dev_round; + f->odelta = f->idelta = 0; + + wproc = aproc_new(&wsock_ops, name); + wproc->u.io.file = &f->pipe.file; + f->pipe.file.wproc = wproc; + f->wstate = SOCK_WIDLE; + f->wtodo = 0xdeadbeef; + + rproc = aproc_new(&rsock_ops, name); + rproc->u.io.file = &f->pipe.file; + f->pipe.file.rproc = rproc; + f->rstate = SOCK_RMSG; + f->rtodo = sizeof(struct amsg); + return f; +} + +/* + * free buffers + */ +void +sock_freebuf(struct sock *f) +{ + struct abuf *rbuf, *wbuf; + + f->pstate = SOCK_INIT; + DPRINTF("sock_freebuf:\n"); + rbuf = LIST_FIRST(&f->pipe.file.rproc->obuflist); + if (rbuf) + abuf_eof(rbuf); + wbuf = LIST_FIRST(&f->pipe.file.wproc->ibuflist); + if (wbuf) + abuf_hup(wbuf); +} + +/* + * allocate buffers, so client can start filling write-end. + */ +void +sock_allocbuf(struct sock *f) +{ + struct abuf *rbuf = NULL, *wbuf = NULL; + unsigned nfr = 0; + + if (f->mode & AMSG_PLAY) { + nfr = f->bufsz - dev_bufsz * f->rpar.rate / dev_rate; + rbuf = abuf_new(nfr, aparams_bpf(&f->rpar)); + aproc_setout(f->pipe.file.rproc, rbuf); + f->odelta = 0; + } + if (f->mode & AMSG_REC) { + nfr = f->bufsz - dev_bufsz * f->wpar.rate / dev_rate; + wbuf = abuf_new(nfr, aparams_bpf(&f->wpar)); + aproc_setin(f->pipe.file.wproc, wbuf); + f->idelta = 0; + } + + DPRINTF("sock_allocbuf: %p, using %u/%u frames buffer\n", + f, nfr, f->bufsz); + + f->pstate = SOCK_START; + if (!(f->mode & AMSG_PLAY)) + (void)sock_attach(f, 0); +} + +/* + * attach play and/or record buffers to dev_mix and/or dev_sub + */ +int +sock_attach(struct sock *f, int force) +{ + struct abuf *rbuf, *wbuf; + + rbuf = LIST_FIRST(&f->pipe.file.rproc->obuflist); + wbuf = LIST_FIRST(&f->pipe.file.wproc->ibuflist); + + /* + * if in SOCK_START state, dont attach until + * the buffer isn't completely filled + */ + if (!force && rbuf && ABUF_WOK(rbuf)) + return 0; + + DPRINTF("sock_attach: %p\n", f); + f->pstate = SOCK_RUN; + + /* + * attach them to the device + */ + dev_attach(f->pipe.file.name, + (f->mode & AMSG_PLAY) ? rbuf : NULL, &f->rpar, f->xrun, + (f->mode & AMSG_REC) ? wbuf : NULL, &f->wpar, f->xrun); + + /* + * send the initial position, if needed + */ + for (;;) { + if (!sock_write(f)) + break; + } + return 1; +} + +void +sock_reset(struct sock *f) +{ + switch (f->pstate) { + case SOCK_START: + (void)sock_attach(f, 1); + f->pstate = SOCK_RUN; + /* PASSTHROUGH */ + case SOCK_RUN: + sock_freebuf(f); + f->pstate = SOCK_INIT; + /* PASSTHROUGH */ + case SOCK_INIT: + /* nothing yet */ + break; + } +} + +/* + * read a message from the file descriptor, return 1 if done, 0 + * otherwise. The message is stored in f->rmsg + */ +int +sock_rmsg(struct sock *f) +{ + unsigned count; + unsigned char *data; + + while (f->rtodo > 0) { + if (!(f->pipe.file.state & FILE_ROK)) { + DPRINTFN(4, "sock_rmsg: blk, rtodo = %u\n", f->rtodo); + return 0; + } + data = (unsigned char *)&f->rmsg; + data += sizeof(struct amsg) - f->rtodo; + count = file_read(&f->pipe.file, data, f->rtodo); + if (count == 0) + return 0; + f->rtodo -= count; + } + DPRINTFN(4, "sock_rmsg: %p: done\n", f); + return 1; +} + +/* + * write a message to the file descriptor, return 1 if done, 0 + * otherwise. The "m" argument is f->rmsg or f->wmsg, and the "ptodo" + * points to the f->rtodo or f->wtodo respectively. + */ +int +sock_wmsg(struct sock *f, struct amsg *m, unsigned *ptodo) +{ + unsigned count; + unsigned char *data; + + while (*ptodo > 0) { + if (!(f->pipe.file.state & FILE_WOK)) { + DPRINTFN(4, "sock_wmsg: blk, *ptodo = %u\n", *ptodo); + return 0; + } + data = (unsigned char *)m; + data += sizeof(struct amsg) - *ptodo; + count = file_write(&f->pipe.file, data, *ptodo); + if (count == 0) + return 0; + *ptodo -= count; + } + DPRINTFN(4, "sock_wmsg: %p: done\n", f); + return 1; +} + +/* + * read data chunk from the file descriptor, return 1 if at least one + * byte was read, 0 if the file blocked. + */ +int +sock_rdata(struct sock *f) +{ + struct aproc *p; + struct abuf *obuf; + unsigned char *data; + unsigned count, n; + +#ifdef DEBUG + if (f->rtodo == 0) { + fprintf(stderr, "sock_rdata: bad call: zero arg\n"); + abort(); + } +#endif + p = f->pipe.file.rproc; + obuf = LIST_FIRST(&p->obuflist); + if (ABUF_FULL(obuf) || !(f->pipe.file.state & FILE_ROK)) + return 0; + data = abuf_wgetblk(obuf, &count, 0); + if (count > f->rtodo) + count = f->rtodo; + n = file_read(&f->pipe.file, data, count); + if (n == 0) + return 0; + abuf_wcommit(obuf, n); + f->rtodo -= n; + return 1; +} + +/* + * write data chunk to the file descriptor, return 1 if at least one + * byte was written, 0 if the file blocked. + */ +int +sock_wdata(struct sock *f) +{ + struct aproc *p; + struct abuf *ibuf; + unsigned char *data; + unsigned count, n; +#define ZERO_MAX 0x1000 + static char zero[ZERO_MAX]; + +#ifdef DEBUG + if (f->wtodo == 0) { + fprintf(stderr, "sock_wdata: bad call: zero arg\n"); + abort(); + } +#endif + if (!(f->pipe.file.state & FILE_WOK)) + return 0; + p = f->pipe.file.wproc; + ibuf = LIST_FIRST(&p->ibuflist); + if (ibuf) { + if (ABUF_EMPTY(ibuf)) + return 0; + data = abuf_rgetblk(ibuf, &count, 0); + if (count > f->wtodo) + count = f->wtodo; + n = file_write(&f->pipe.file, data, count); + if (n == 0) + return 0; + abuf_rdiscard(ibuf, n); + f->wtodo -= n; + } else { + /* + * there's no dev_detach() routine yet, + * so now we abruptly destroy the buffer. + * Until we implement dev_detach, complete + * the packet with zeros... + */ + count = ZERO_MAX; + if (count > f->wtodo) + count = f->wtodo; + n = file_write(&f->pipe.file, zero, count); + if (n == 0) + return 0; + f->wtodo -= n; + } + return 1; +} + +int +sock_setpar(struct sock *f) +{ + struct amsg_par *p = &f->rmsg.u.par; + unsigned minbuf, maxbuf; + + if (AMSG_ISSET(p->mode)) { + if ((p->mode & ~(AMSG_PLAY | AMSG_REC)) || p->mode == 0) { + DPRINTF("sock_setpar: bad mode %x\n", p->mode); + return 0; + } + f->mode = 0; + if ((p->mode & AMSG_PLAY) && dev_mix) + f->mode |= AMSG_PLAY; + if ((p->mode & AMSG_REC) && dev_sub) + f->mode |= AMSG_REC; + DPRINTF("sock_setpar: mode -> %x\n", f->mode); + } + if (AMSG_ISSET(p->bits)) { + if (p->bits < BITS_MIN || p->bits > BITS_MAX) { + DPRINTF("sock_setpar: bits out of bounds\n"); + return 0; + } + if (AMSG_ISSET(p->bps)) { + if (p->bps < ((p->bits + 7) / 8) || p->bps > 4) { + DPRINTF("sock_setpar: bps out of bounds\n"); + return 0; + } + } else + p->bps = APARAMS_BPS(p->bits); + f->rpar.bits = f->wpar.bits = p->bits; + f->rpar.bps = f->wpar.bps = p->bps; + DPRINTF("sock_setpar: bits/bps -> %u/%u\n", p->bits, p->bps); + } + if (AMSG_ISSET(p->sig)) + f->rpar.sig = f->wpar.sig = p->sig ? 1 : 0; + if (AMSG_ISSET(p->le)) + f->rpar.le = f->wpar.le = p->le ? 1 : 0; + if (AMSG_ISSET(p->msb)) + f->rpar.msb = f->wpar.msb = p->msb ? 1 : 0; + if (AMSG_ISSET(p->rchan) && (f->mode & AMSG_REC)) { + if (p->rchan < 1) + p->rchan = 1; + if (p->rchan > NCHAN_MAX - 1) + p->rchan = NCHAN_MAX - 1; + f->wpar.cmin = 0; + f->wpar.cmax = p->rchan - 1; + DPRINTF("sock_setpar: rchan -> %u\n", p->rchan); + } + if (AMSG_ISSET(p->pchan) && (f->mode & AMSG_PLAY)) { + if (p->pchan < 1) + p->pchan = 1; + if (p->pchan > NCHAN_MAX - 1) + p->pchan = NCHAN_MAX - 1; + f->rpar.cmin = 0; + f->rpar.cmax = p->pchan - 1; + DPRINTF("sock_setpar: pchan -> %u\n", p->pchan); + } + if (AMSG_ISSET(p->rate)) { + if (p->rate < RATE_MIN) + p->rate = RATE_MIN; + if (p->rate > RATE_MAX) + p->rate = RATE_MAX; + dev_roundrate(&p->rate, &f->round); + f->rpar.rate = f->wpar.rate = p->rate; + if (f->mode & AMSG_PLAY) + f->bufsz = 2 * dev_bufsz * f->rpar.rate / dev_rate; + else + f->bufsz = 2 * dev_bufsz * f->wpar.rate / dev_rate; + DPRINTF("sock_setpar: rate -> %u\n", p->rate); + } + if (AMSG_ISSET(p->xrun)) { + if (p->xrun != AMSG_IGNORE && + p->xrun != AMSG_SYNC && + p->xrun != AMSG_ERROR) { + DPRINTF("sock_setpar: bad xrun: %u\n", p->xrun); + return 0; + } + f->xrun = p->xrun; + DPRINTF("sock_setpar: xrun -> %u\n", f->xrun); + } + if (AMSG_ISSET(p->bufsz)) { + minbuf = 3 * dev_bufsz / 2; + minbuf -= minbuf % dev_round; + maxbuf = dev_bufsz; + if (f->mode & AMSG_PLAY) { + minbuf = minbuf * f->rpar.rate / dev_rate; + maxbuf = maxbuf * f->rpar.rate / dev_rate; + maxbuf += f->rpar.rate; + } else { + minbuf = minbuf * f->wpar.rate / dev_rate; + maxbuf = maxbuf * f->wpar.rate / dev_rate; + maxbuf += f->wpar.rate; + } + if (p->bufsz < minbuf) + p->bufsz = minbuf; + if (p->bufsz > maxbuf) + p->bufsz = maxbuf; + f->bufsz = p->bufsz + f->round - 1; + f->bufsz -= f->bufsz % f->round; + DPRINTF("sock_setpar: bufsz -> %u\n", f->bufsz); + } + if (debug_level > 0) { + fprintf(stderr, "sock_setpar: %p: rpar=", f); + aparams_print(&f->rpar); + fprintf(stderr, ", wpar="); + aparams_print(&f->wpar); + fprintf(stderr, ", mode=%u, bufsz=%u\n", f->mode, f->bufsz); + } + return 1; +} + +/* + * execute message in f->rmsg and change the state accordingly; return 1 + * on success, and 0 on failure, in which case the socket is destroyed. + */ +int +sock_execmsg(struct sock *f) +{ + struct amsg *m = &f->rmsg; + + switch (m->cmd) { + case AMSG_DATA: + DPRINTFN(4, "sock_execmsg: %p: DATA\n", f); + if (f->pstate != SOCK_RUN && f->pstate != SOCK_START) { + DPRINTF("sock_execmsg: %p: DATA, bad state\n", f); + aproc_del(f->pipe.file.rproc); + return 0; + } + f->rstate = SOCK_RDATA; + f->rtodo = m->u.data.size; + if (f->rtodo == 0) { + DPRINTF("sock_execmsg: zero-length data chunk\n"); + aproc_del(f->pipe.file.rproc); + return 0; + } + break; + case AMSG_START: + DPRINTFN(2, "sock_execmsg: %p: START\n", f); + if (f->pstate != SOCK_INIT) { + DPRINTF("sock_execmsg: %p: START, bad state\n", f); + aproc_del(f->pipe.file.rproc); + return 0; + } + sock_allocbuf(f); + f->rstate = SOCK_RMSG; + f->rtodo = sizeof(struct amsg); + break; + case AMSG_STOP: + DPRINTFN(2, "sock_execmsg: %p: STOP\n", f); + if (f->pstate != SOCK_RUN && f->pstate != SOCK_START) { + DPRINTF("sock_execmsg: %p: STOP, bad state\n", f); + aproc_del(f->pipe.file.rproc); + return 0; + } + if (f->pstate == SOCK_START) + (void)sock_attach(f, 1); + sock_freebuf(f); + AMSG_INIT(m); + m->cmd = AMSG_ACK; + f->rstate = SOCK_RRET; + f->rtodo = sizeof(struct amsg); + break; + case AMSG_SETPAR: + DPRINTFN(2, "sock_execmsg: %p: SETPAR\n", f); + if (f->pstate != SOCK_INIT) { + DPRINTF("sock_execmsg: %p: SETPAR, bad state\n", f); + aproc_del(f->pipe.file.rproc); + return 0; + } + if (!sock_setpar(f)) { + aproc_del(f->pipe.file.rproc); + return 0; + } + f->rtodo = sizeof(struct amsg); + f->rstate = SOCK_RMSG; + break; + case AMSG_GETPAR: + DPRINTFN(2, "sock_execmsg: %p: GETPAR\n", f); + if (f->pstate != SOCK_INIT) { + DPRINTF("sock_execmsg: %p: GETPAR, bad state\n", f); + aproc_del(f->pipe.file.rproc); + return 0; + } + AMSG_INIT(m); + m->cmd = AMSG_GETPAR; + m->u.par.mode = f->mode; + m->u.par.bits = f->rpar.bits; + m->u.par.bps = f->rpar.bps; + m->u.par.sig = f->rpar.sig; + m->u.par.le = f->rpar.le; + m->u.par.msb = f->rpar.msb; + m->u.par.rate = f->rpar.rate; + m->u.par.rchan = f->wpar.cmax - f->wpar.cmin + 1; + m->u.par.pchan = f->rpar.cmax - f->rpar.cmin + 1; + m->u.par.bufsz = f->bufsz; + m->u.par.round = f->round; + f->rstate = SOCK_RRET; + f->rtodo = sizeof(struct amsg); + break; + case AMSG_GETCAP: + DPRINTFN(2, "sock_execmsg: %p: GETCAP\n", f); + if (f->pstate != SOCK_INIT) { + DPRINTF("sock_execmsg: %p: GETCAP, bad state\n", f); + aproc_del(f->pipe.file.rproc); + return 0; + } + AMSG_INIT(m); + m->cmd = AMSG_GETCAP; + m->u.cap.rate = dev_rate; + m->u.cap.rate_div = dev_rate_div; + m->u.cap.pchan = dev_opar.cmax - dev_opar.cmin + 1; + m->u.cap.rchan = dev_ipar.cmax - dev_ipar.cmin + 1; + m->u.cap.bits = sizeof(short) * 8; + m->u.cap.bps = sizeof(short); + f->rstate = SOCK_RRET; + f->rtodo = sizeof(struct amsg); + break; + default: + DPRINTF("sock_execmsg: %p bogus command\n", f); + aproc_del(f->pipe.file.rproc); + return 0; + } + if (f->rstate == SOCK_RRET) { + if (f->wstate != SOCK_WIDLE || + !sock_wmsg(f, &f->rmsg, &f->rtodo)) + return 0; + DPRINTF("sock_execmsg: %p RRET done\n", f); + f->rtodo = sizeof(struct amsg); + f->rstate = SOCK_RMSG; + } + return 1; +} + +/* + * create a new data/pos message + */ +int +sock_buildmsg(struct sock *f) +{ + struct aproc *p; + struct abuf *ibuf; + int *pdelta; + + /* + * if pos changed, build a MOVE message + */ + pdelta = (f->mode & AMSG_REC) ? &f->idelta : &f->odelta; + if ((f->pstate == SOCK_RUN && *pdelta > 0) || + (f->pstate == SOCK_START && *pdelta < 0)) { + DPRINTFN(4, "sock_buildmsg: %p: POS: %d\n", f, *pdelta); + AMSG_INIT(&f->wmsg); + f->wmsg.cmd = AMSG_MOVE; + f->wmsg.u.ts.delta = *pdelta; + *pdelta = 0; + f->wtodo = sizeof(struct amsg); + f->wstate = SOCK_WMSG; + return 1; + } + + /* + * if data available, build a DATA message + */ + p = f->pipe.file.wproc; + ibuf = LIST_FIRST(&p->ibuflist); + if (ibuf && ABUF_ROK(ibuf)) { + AMSG_INIT(&f->wmsg); + f->wmsg.cmd = AMSG_DATA; + f->wmsg.u.data.size = ibuf->used - (ibuf->used % ibuf->bpf); + if (f->wmsg.u.data.size > AMSG_DATAMAX) + f->wmsg.u.data.size = + AMSG_DATAMAX - (AMSG_DATAMAX % ibuf->bpf); + f->wtodo = sizeof(struct amsg); + f->wstate = SOCK_WMSG; + return 1; + } + + DPRINTFN(4, "sock_buildmsg: %p: idling...\n", f); + f->wstate = SOCK_WIDLE; + return 0; +} + +/* + * read from the socket file descriptor, fill input buffer and update + * the state. Return 1 if at least one message or 1 data byte was + * processed, 0 if something blocked. + */ +int +sock_read(struct sock *f) +{ + DPRINTFN(4, "sock_read: %p; rstate = %u, rtodo = %u\n", + f, f->rstate, f->rtodo); + + switch (f->rstate) { + case SOCK_RMSG: + if (!sock_rmsg(f)) + return 0; + if (!sock_execmsg(f)) + return 0; + break; + case SOCK_RDATA: + if (!sock_rdata(f)) + return 0; + if (f->rtodo == 0) { + f->rstate = SOCK_RMSG; + f->rtodo = sizeof(struct amsg); + } + if (f->pstate == SOCK_START) + (void)sock_attach(f, 0); + break; + case SOCK_RRET: + DPRINTF("sock_read: %p: blocked in RRET\n", f); + return 0; + } + DPRINTFN(4, "sock_read: %p: done, rstate = %u\n", f, f->rstate); + return 1; +} + +/* + * process messages to return + */ +int +sock_return(struct sock *f) +{ + struct aproc *rp; + + while (f->rstate == SOCK_RRET) { + if (!sock_wmsg(f, &f->rmsg, &f->rtodo)) + return 0; + DPRINTF("sock_return: %p: done\n", f); + f->rstate = SOCK_RMSG; + f->rtodo = sizeof(struct amsg); + for (;;) { + /* + * in() may trigger rsock_done and destroy the + * wsock + */ + rp = f->pipe.file.rproc; + if (!rp || !rp->ops->in(rp, NULL)) + break; + } + if (f->pipe.file.wproc == NULL) + return 0; + } + return 1; +} + +/* + * write messages and data on the socket file descriptor. Return 1 if + * at least one message or one data byte was processed, 0 if something + * blocked. + */ +int +sock_write(struct sock *f) +{ + DPRINTFN(4, "sock_write: %p: wstate = %u, wtodo = %u\n", + f, f->wstate, f->wtodo); + + switch (f->wstate) { + case SOCK_WMSG: + if (!sock_wmsg(f, &f->wmsg, &f->wtodo)) + return 0; + if (f->wmsg.cmd != AMSG_DATA) { + f->wstate = SOCK_WIDLE; + f->wtodo = 0xdeadbeef; + break; + } + f->wstate = SOCK_WDATA; + f->wtodo = f->wmsg.u.data.size; + /* PASSTHROUGH */ + case SOCK_WDATA: + if (!sock_wdata(f)) + return 0; + if (f->wtodo > 0) + break; + f->wstate = SOCK_WIDLE; + f->wtodo = 0xdeadbeef; + /* PASSTHROUGH */ + case SOCK_WIDLE: + if (!sock_return(f)) + return 0; + if (!sock_buildmsg(f)) + return 0; + break; + default: + fprintf(stderr, "sock_write: unknown state\n"); + abort(); + } + return 1; +} + |