/* $OpenBSD: sys_pipe.c,v 1.148 2024/12/30 02:46:00 guenther Exp $ */ /* * Copyright (c) 1996 John S. Dyson * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice immediately at the beginning of the file, without modification, * this list of conditions, and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. Absolutely no warranty of function or purpose is made by the author * John S. Dyson. * 4. Modifications may be freely made to this file if the above conditions * are met. */ /* * This file contains a high-performance replacement for the socket-based * pipes scheme originally used in FreeBSD/4.4Lite. It does not support * all features of sockets, but does do everything that pipes normally * do. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #ifdef KTRACE #include #endif #include #include struct pipe_pair { struct pipe pp_wpipe; struct pipe pp_rpipe; struct rwlock pp_lock; }; /* * interfaces to the outside world */ int pipe_read(struct file *, struct uio *, int); int pipe_write(struct file *, struct uio *, int); int pipe_close(struct file *, struct proc *); int pipe_kqfilter(struct file *fp, struct knote *kn); int pipe_ioctl(struct file *, u_long, caddr_t, struct proc *); int pipe_stat(struct file *fp, struct stat *ub, struct proc *p); static const struct fileops pipeops = { .fo_read = pipe_read, .fo_write = pipe_write, .fo_ioctl = pipe_ioctl, .fo_kqfilter = pipe_kqfilter, .fo_stat = pipe_stat, .fo_close = pipe_close }; void filt_pipedetach(struct knote *kn); int filt_piperead(struct knote *kn, long hint); int filt_pipewrite(struct knote *kn, long hint); int filt_pipeexcept(struct knote *kn, long hint); int filt_pipemodify(struct kevent *kev, struct knote *kn); int filt_pipeprocess(struct knote *kn, struct kevent *kev); const struct filterops pipe_rfiltops = { .f_flags = FILTEROP_ISFD | FILTEROP_MPSAFE, .f_attach = NULL, .f_detach = filt_pipedetach, .f_event = filt_piperead, .f_modify = filt_pipemodify, .f_process = filt_pipeprocess, }; const struct filterops pipe_wfiltops = { .f_flags = FILTEROP_ISFD | FILTEROP_MPSAFE, .f_attach = NULL, .f_detach = filt_pipedetach, .f_event = filt_pipewrite, .f_modify = filt_pipemodify, .f_process = filt_pipeprocess, }; const struct filterops pipe_efiltops = { .f_flags = FILTEROP_ISFD | FILTEROP_MPSAFE, .f_attach = NULL, .f_detach = filt_pipedetach, .f_event = filt_pipeexcept, .f_modify = filt_pipemodify, .f_process = filt_pipeprocess, }; /* * Default pipe buffer size(s), this can be kind-of large now because pipe * space is pageable. The pipe code will try to maintain locality of * reference for performance reasons, so small amounts of outstanding I/O * will not wipe the cache. */ #define MINPIPESIZE (PIPE_SIZE/3) /* * Limit the number of "big" pipes */ #define LIMITBIGPIPES 32 unsigned int nbigpipe; static unsigned int amountpipekva; struct pool pipe_pair_pool; int dopipe(struct proc *, int *, int); void pipe_wakeup(struct pipe *); int pipe_create(struct pipe *); void pipe_destroy(struct pipe *); int pipe_rundown(struct pipe *); struct pipe *pipe_peer(struct pipe *); int pipe_buffer_realloc(struct pipe *, u_int); void pipe_buffer_free(struct pipe *); int pipe_iolock(struct pipe *); void pipe_iounlock(struct pipe *); int pipe_iosleep(struct pipe *, const char *); struct pipe_pair *pipe_pair_create(void); void pipe_pair_destroy(struct pipe_pair *); /* * The pipe system call for the DTYPE_PIPE type of pipes */ int sys_pipe(struct proc *p, void *v, register_t *retval) { struct sys_pipe_args /* { syscallarg(int *) fdp; } */ *uap = v; return (dopipe(p, SCARG(uap, fdp), 0)); } int sys_pipe2(struct proc *p, void *v, register_t *retval) { struct sys_pipe2_args /* { syscallarg(int *) fdp; syscallarg(int) flags; } */ *uap = v; if (SCARG(uap, flags) & ~(O_CLOEXEC | FNONBLOCK)) return (EINVAL); return (dopipe(p, SCARG(uap, fdp), SCARG(uap, flags))); } int dopipe(struct proc *p, int *ufds, int flags) { struct filedesc *fdp = p->p_fd; struct file *rf, *wf; struct pipe_pair *pp; struct pipe *rpipe, *wpipe = NULL; int fds[2], cloexec, error; cloexec = (flags & O_CLOEXEC) ? UF_EXCLOSE : 0; pp = pipe_pair_create(); if (pp == NULL) return (ENOMEM); wpipe = &pp->pp_wpipe; rpipe = &pp->pp_rpipe; fdplock(fdp); error = falloc(p, &rf, &fds[0]); if (error != 0) goto free2; rf->f_flag = FREAD | FWRITE | (flags & FNONBLOCK); rf->f_type = DTYPE_PIPE; rf->f_data = rpipe; rf->f_ops = &pipeops; error = falloc(p, &wf, &fds[1]); if (error != 0) goto free3; wf->f_flag = FREAD | FWRITE | (flags & FNONBLOCK); wf->f_type = DTYPE_PIPE; wf->f_data = wpipe; wf->f_ops = &pipeops; fdinsert(fdp, fds[0], cloexec, rf); fdinsert(fdp, fds[1], cloexec, wf); error = copyout(fds, ufds, sizeof(fds)); if (error == 0) { fdpunlock(fdp); #ifdef KTRACE if (KTRPOINT(p, KTR_STRUCT)) ktrfds(p, fds, 2); #endif } else { /* fdrelease() unlocks fdp. */ fdrelease(p, fds[0]); fdplock(fdp); fdrelease(p, fds[1]); } FRELE(rf, p); FRELE(wf, p); return (error); free3: fdremove(fdp, fds[0]); closef(rf, p); rpipe = NULL; free2: fdpunlock(fdp); pipe_destroy(wpipe); pipe_destroy(rpipe); return (error); } /* * Allocate kva for pipe circular buffer, the space is pageable. * This routine will 'realloc' the size of a pipe safely, if it fails * it will retain the old buffer. * If it fails it will return ENOMEM. */ int pipe_buffer_realloc(struct pipe *cpipe, u_int size) { caddr_t buffer; /* buffer uninitialized or pipe locked */ KASSERT((cpipe->pipe_buffer.buffer == NULL) || (cpipe->pipe_state & PIPE_LOCK)); /* buffer should be empty */ KASSERT(cpipe->pipe_buffer.cnt == 0); buffer = km_alloc(size, &kv_any, &kp_pageable, &kd_waitok); if (buffer == NULL) return (ENOMEM); /* free old resources if we are resizing */ pipe_buffer_free(cpipe); cpipe->pipe_buffer.buffer = buffer; cpipe->pipe_buffer.size = size; cpipe->pipe_buffer.in = 0; cpipe->pipe_buffer.out = 0; atomic_add_int(&amountpipekva, cpipe->pipe_buffer.size); return (0); } /* * initialize and allocate VM and memory for pipe */ int pipe_create(struct pipe *cpipe) { int error; error = pipe_buffer_realloc(cpipe, PIPE_SIZE); if (error != 0) return (error); sigio_init(&cpipe->pipe_sigio); getnanotime(&cpipe->pipe_ctime); cpipe->pipe_atime = cpipe->pipe_ctime; cpipe->pipe_mtime = cpipe->pipe_ctime; return (0); } struct pipe * pipe_peer(struct pipe *cpipe) { struct pipe *peer; rw_assert_anylock(cpipe->pipe_lock); peer = cpipe->pipe_peer; if (peer == NULL || (peer->pipe_state & PIPE_EOF)) return (NULL); return (peer); } /* * Lock a pipe for exclusive I/O access. */ int pipe_iolock(struct pipe *cpipe) { int error; rw_assert_wrlock(cpipe->pipe_lock); while (cpipe->pipe_state & PIPE_LOCK) { cpipe->pipe_state |= PIPE_LWANT; error = rwsleep_nsec(cpipe, cpipe->pipe_lock, PRIBIO | PCATCH, "pipeiolk", INFSLP); if (error) return (error); } cpipe->pipe_state |= PIPE_LOCK; return (0); } /* * Unlock a pipe I/O lock. */ void pipe_iounlock(struct pipe *cpipe) { rw_assert_wrlock(cpipe->pipe_lock); KASSERT(cpipe->pipe_state & PIPE_LOCK); cpipe->pipe_state &= ~PIPE_LOCK; if (cpipe->pipe_state & PIPE_LWANT) { cpipe->pipe_state &= ~PIPE_LWANT; wakeup(cpipe); } } /* * Unlock the pipe I/O lock and go to sleep. Returns 0 on success and the I/O * lock is relocked. Otherwise if a signal was caught, non-zero is returned and * the I/O lock is not locked. * * Any caller must obtain a reference to the pipe by incrementing `pipe_busy' * before calling this function in order ensure that the same pipe is not * destroyed while sleeping. */ int pipe_iosleep(struct pipe *cpipe, const char *wmesg) { int error; pipe_iounlock(cpipe); error = rwsleep_nsec(cpipe, cpipe->pipe_lock, PRIBIO | PCATCH, wmesg, INFSLP); if (error) return (error); return (pipe_iolock(cpipe)); } void pipe_wakeup(struct pipe *cpipe) { rw_assert_wrlock(cpipe->pipe_lock); knote_locked(&cpipe->pipe_klist, 0); if (cpipe->pipe_state & PIPE_ASYNC) pgsigio(&cpipe->pipe_sigio, SIGIO, 0); } int pipe_read(struct file *fp, struct uio *uio, int fflags) { struct pipe *rpipe = fp->f_data; size_t nread = 0, size; int error; rw_enter_write(rpipe->pipe_lock); ++rpipe->pipe_busy; error = pipe_iolock(rpipe); if (error) { --rpipe->pipe_busy; pipe_rundown(rpipe); rw_exit_write(rpipe->pipe_lock); return (error); } while (uio->uio_resid) { /* Normal pipe buffer receive. */ if (rpipe->pipe_buffer.cnt > 0) { size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out; if (size > rpipe->pipe_buffer.cnt) size = rpipe->pipe_buffer.cnt; if (size > uio->uio_resid) size = uio->uio_resid; rw_exit_write(rpipe->pipe_lock); error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out], size, uio); rw_enter_write(rpipe->pipe_lock); if (error) { break; } rpipe->pipe_buffer.out += size; if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size) rpipe->pipe_buffer.out = 0; rpipe->pipe_buffer.cnt -= size; /* * If there is no more to read in the pipe, reset * its pointers to the beginning. This improves * cache hit stats. */ if (rpipe->pipe_buffer.cnt == 0) { rpipe->pipe_buffer.in = 0; rpipe->pipe_buffer.out = 0; } nread += size; } else { /* * detect EOF condition * read returns 0 on EOF, no need to set error */ if (rpipe->pipe_state & PIPE_EOF) break; /* If the "write-side" has been blocked, wake it up. */ if (rpipe->pipe_state & PIPE_WANTW) { rpipe->pipe_state &= ~PIPE_WANTW; wakeup(rpipe); } /* Break if some data was read. */ if (nread > 0) break; /* Handle non-blocking mode operation. */ if (fp->f_flag & FNONBLOCK) { error = EAGAIN; break; } /* Wait for more data. */ rpipe->pipe_state |= PIPE_WANTR; error = pipe_iosleep(rpipe, "piperd"); if (error) goto unlocked_error; } } pipe_iounlock(rpipe); if (error == 0) getnanotime(&rpipe->pipe_atime); unlocked_error: --rpipe->pipe_busy; if (pipe_rundown(rpipe) == 0 && rpipe->pipe_buffer.cnt < MINPIPESIZE) { /* Handle write blocking hysteresis. */ if (rpipe->pipe_state & PIPE_WANTW) { rpipe->pipe_state &= ~PIPE_WANTW; wakeup(rpipe); } } if (rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt >= PIPE_BUF) pipe_wakeup(rpipe); rw_exit_write(rpipe->pipe_lock); return (error); } int pipe_write(struct file *fp, struct uio *uio, int fflags) { struct pipe *rpipe = fp->f_data, *wpipe; struct rwlock *lock = rpipe->pipe_lock; size_t orig_resid; int error; rw_enter_write(lock); wpipe = pipe_peer(rpipe); /* Detect loss of pipe read side, issue SIGPIPE if lost. */ if (wpipe == NULL) { rw_exit_write(lock); return (EPIPE); } ++wpipe->pipe_busy; error = pipe_iolock(wpipe); if (error) { --wpipe->pipe_busy; pipe_rundown(wpipe); rw_exit_write(lock); return (error); } /* If it is advantageous to resize the pipe buffer, do so. */ if (uio->uio_resid > PIPE_SIZE && wpipe->pipe_buffer.size <= PIPE_SIZE && wpipe->pipe_buffer.cnt == 0) { unsigned int npipe; npipe = atomic_inc_int_nv(&nbigpipe); if (npipe > LIMITBIGPIPES || pipe_buffer_realloc(wpipe, BIG_PIPE_SIZE) != 0) atomic_dec_int(&nbigpipe); } orig_resid = uio->uio_resid; while (uio->uio_resid) { size_t space; if (wpipe->pipe_state & PIPE_EOF) { error = EPIPE; break; } space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; /* Writes of size <= PIPE_BUF must be atomic. */ if (space < uio->uio_resid && orig_resid <= PIPE_BUF) space = 0; if (space > 0) { size_t size; /* Transfer size */ size_t segsize; /* first segment to transfer */ /* * Transfer size is minimum of uio transfer * and free space in pipe buffer. */ if (space > uio->uio_resid) size = uio->uio_resid; else size = space; /* * First segment to transfer is minimum of * transfer size and contiguous space in * pipe buffer. If first segment to transfer * is less than the transfer size, we've got * a wraparound in the buffer. */ segsize = wpipe->pipe_buffer.size - wpipe->pipe_buffer.in; if (segsize > size) segsize = size; /* Transfer first segment */ rw_exit_write(lock); error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in], segsize, uio); rw_enter_write(lock); if (error == 0 && segsize < size) { /* * Transfer remaining part now, to * support atomic writes. Wraparound * happened. */ #ifdef DIAGNOSTIC if (wpipe->pipe_buffer.in + segsize != wpipe->pipe_buffer.size) panic("Expected pipe buffer wraparound disappeared"); #endif rw_exit_write(lock); error = uiomove(&wpipe->pipe_buffer.buffer[0], size - segsize, uio); rw_enter_write(lock); } if (error == 0) { wpipe->pipe_buffer.in += size; if (wpipe->pipe_buffer.in >= wpipe->pipe_buffer.size) { #ifdef DIAGNOSTIC if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size) panic("Expected wraparound bad"); #endif wpipe->pipe_buffer.in = size - segsize; } wpipe->pipe_buffer.cnt += size; #ifdef DIAGNOSTIC if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size) panic("Pipe buffer overflow"); #endif } if (error) break; } else { /* If the "read-side" has been blocked, wake it up. */ if (wpipe->pipe_state & PIPE_WANTR) { wpipe->pipe_state &= ~PIPE_WANTR; wakeup(wpipe); } /* Don't block on non-blocking I/O. */ if (fp->f_flag & FNONBLOCK) { error = EAGAIN; break; } /* * We have no more space and have something to offer, * wake up select/poll. */ pipe_wakeup(wpipe); wpipe->pipe_state |= PIPE_WANTW; error = pipe_iosleep(wpipe, "pipewr"); if (error) goto unlocked_error; /* * If read side wants to go away, we just issue a * signal to ourselves. */ if (wpipe->pipe_state & PIPE_EOF) { error = EPIPE; break; } } } pipe_iounlock(wpipe); unlocked_error: --wpipe->pipe_busy; if (pipe_rundown(wpipe) == 0 && wpipe->pipe_buffer.cnt > 0) { /* * If we have put any characters in the buffer, we wake up * the reader. */ if (wpipe->pipe_state & PIPE_WANTR) { wpipe->pipe_state &= ~PIPE_WANTR; wakeup(wpipe); } } /* Don't return EPIPE if I/O was successful. */ if (wpipe->pipe_buffer.cnt == 0 && uio->uio_resid == 0 && error == EPIPE) { error = 0; } if (error == 0) getnanotime(&wpipe->pipe_mtime); /* We have something to offer, wake up select/poll. */ if (wpipe->pipe_buffer.cnt) pipe_wakeup(wpipe); rw_exit_write(lock); return (error); } /* * we implement a very minimal set of ioctls for compatibility with sockets. */ int pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct proc *p) { struct pipe *mpipe = fp->f_data; int error = 0; switch (cmd) { case FIOASYNC: rw_enter_write(mpipe->pipe_lock); if (*(int *)data) { mpipe->pipe_state |= PIPE_ASYNC; } else { mpipe->pipe_state &= ~PIPE_ASYNC; } rw_exit_write(mpipe->pipe_lock); break; case FIONREAD: rw_enter_read(mpipe->pipe_lock); *(int *)data = mpipe->pipe_buffer.cnt; rw_exit_read(mpipe->pipe_lock); break; case FIOSETOWN: case SIOCSPGRP: case TIOCSPGRP: error = sigio_setown(&mpipe->pipe_sigio, cmd, data); break; case FIOGETOWN: case SIOCGPGRP: case TIOCGPGRP: sigio_getown(&mpipe->pipe_sigio, cmd, data); break; default: error = ENOTTY; } return (error); } int pipe_stat(struct file *fp, struct stat *ub, struct proc *p) { struct pipe *pipe = fp->f_data; memset(ub, 0, sizeof(*ub)); rw_enter_read(pipe->pipe_lock); ub->st_mode = S_IFIFO; ub->st_blksize = pipe->pipe_buffer.size; ub->st_size = pipe->pipe_buffer.cnt; ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize; ub->st_atim.tv_sec = pipe->pipe_atime.tv_sec; ub->st_atim.tv_nsec = pipe->pipe_atime.tv_nsec; ub->st_mtim.tv_sec = pipe->pipe_mtime.tv_sec; ub->st_mtim.tv_nsec = pipe->pipe_mtime.tv_nsec; ub->st_ctim.tv_sec = pipe->pipe_ctime.tv_sec; ub->st_ctim.tv_nsec = pipe->pipe_ctime.tv_nsec; ub->st_uid = fp->f_cred->cr_uid; ub->st_gid = fp->f_cred->cr_gid; rw_exit_read(pipe->pipe_lock); /* * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen. * XXX (st_dev, st_ino) should be unique. */ return (0); } int pipe_close(struct file *fp, struct proc *p) { struct pipe *cpipe = fp->f_data; fp->f_ops = NULL; fp->f_data = NULL; pipe_destroy(cpipe); return (0); } /* * Free kva for pipe circular buffer. * No pipe lock check as only called from pipe_buffer_realloc() and pipeclose() */ void pipe_buffer_free(struct pipe *cpipe) { u_int size; if (cpipe->pipe_buffer.buffer == NULL) return; size = cpipe->pipe_buffer.size; km_free(cpipe->pipe_buffer.buffer, size, &kv_any, &kp_pageable); cpipe->pipe_buffer.buffer = NULL; atomic_sub_int(&amountpipekva, size); if (size > PIPE_SIZE) atomic_dec_int(&nbigpipe); } /* * shutdown the pipe, and free resources. */ void pipe_destroy(struct pipe *cpipe) { struct pipe *ppipe; if (cpipe == NULL) return; rw_enter_write(cpipe->pipe_lock); pipe_wakeup(cpipe); sigio_free(&cpipe->pipe_sigio); /* * If the other side is blocked, wake it up saying that * we want to close it down. */ cpipe->pipe_state |= PIPE_EOF; while (cpipe->pipe_busy) { wakeup(cpipe); cpipe->pipe_state |= PIPE_WANTD; rwsleep_nsec(cpipe, cpipe->pipe_lock, PRIBIO, "pipecl", INFSLP); } /* Disconnect from peer. */ if ((ppipe = cpipe->pipe_peer) != NULL) { pipe_wakeup(ppipe); ppipe->pipe_state |= PIPE_EOF; wakeup(ppipe); ppipe->pipe_peer = NULL; } pipe_buffer_free(cpipe); rw_exit_write(cpipe->pipe_lock); if (ppipe == NULL) pipe_pair_destroy(cpipe->pipe_pair); } /* * Returns non-zero if a rundown is currently ongoing. */ int pipe_rundown(struct pipe *cpipe) { rw_assert_wrlock(cpipe->pipe_lock); if (cpipe->pipe_busy > 0 || (cpipe->pipe_state & PIPE_WANTD) == 0) return (0); /* Only wakeup pipe_destroy() once the pipe is no longer busy. */ cpipe->pipe_state &= ~(PIPE_WANTD | PIPE_WANTR | PIPE_WANTW); wakeup(cpipe); return (1); } int pipe_kqfilter(struct file *fp, struct knote *kn) { struct pipe *rpipe = kn->kn_fp->f_data, *wpipe; struct rwlock *lock = rpipe->pipe_lock; int error = 0; rw_enter_write(lock); wpipe = pipe_peer(rpipe); switch (kn->kn_filter) { case EVFILT_READ: kn->kn_fop = &pipe_rfiltops; kn->kn_hook = rpipe; klist_insert_locked(&rpipe->pipe_klist, kn); break; case EVFILT_WRITE: if (wpipe == NULL) { /* * The other end of the pipe has been closed. * Since the filter now always indicates a pending * event, attach the knote to the current side * to proceed with the registration. */ wpipe = rpipe; } kn->kn_fop = &pipe_wfiltops; kn->kn_hook = wpipe; klist_insert_locked(&wpipe->pipe_klist, kn); break; case EVFILT_EXCEPT: if (kn->kn_flags & __EV_SELECT) { /* Prevent triggering exceptfds. */ error = EPERM; break; } if ((kn->kn_flags & __EV_POLL) == 0) { /* Disallow usage through kevent(2). */ error = EINVAL; break; } kn->kn_fop = &pipe_efiltops; kn->kn_hook = rpipe; klist_insert_locked(&rpipe->pipe_klist, kn); break; default: error = EINVAL; } rw_exit_write(lock); return (error); } void filt_pipedetach(struct knote *kn) { struct pipe *cpipe = kn->kn_hook; klist_remove(&cpipe->pipe_klist, kn); } int filt_piperead(struct knote *kn, long hint) { struct pipe *rpipe = kn->kn_fp->f_data, *wpipe; rw_assert_wrlock(rpipe->pipe_lock); wpipe = pipe_peer(rpipe); kn->kn_data = rpipe->pipe_buffer.cnt; if ((rpipe->pipe_state & PIPE_EOF) || wpipe == NULL) { kn->kn_flags |= EV_EOF; if (kn->kn_flags & __EV_POLL) kn->kn_flags |= __EV_HUP; return (1); } return (kn->kn_data > 0); } int filt_pipewrite(struct knote *kn, long hint) { struct pipe *rpipe = kn->kn_fp->f_data, *wpipe; rw_assert_wrlock(rpipe->pipe_lock); wpipe = pipe_peer(rpipe); if (wpipe == NULL) { kn->kn_data = 0; kn->kn_flags |= EV_EOF; if (kn->kn_flags & __EV_POLL) kn->kn_flags |= __EV_HUP; return (1); } kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; return (kn->kn_data >= PIPE_BUF); } int filt_pipeexcept(struct knote *kn, long hint) { struct pipe *rpipe = kn->kn_fp->f_data, *wpipe; int active = 0; rw_assert_wrlock(rpipe->pipe_lock); wpipe = pipe_peer(rpipe); if (kn->kn_flags & __EV_POLL) { if ((rpipe->pipe_state & PIPE_EOF) || wpipe == NULL) { kn->kn_flags |= __EV_HUP; active = 1; } } return (active); } int filt_pipemodify(struct kevent *kev, struct knote *kn) { struct pipe *rpipe = kn->kn_fp->f_data; int active; rw_enter_write(rpipe->pipe_lock); active = knote_modify(kev, kn); rw_exit_write(rpipe->pipe_lock); return (active); } int filt_pipeprocess(struct knote *kn, struct kevent *kev) { struct pipe *rpipe = kn->kn_fp->f_data; int active; rw_enter_write(rpipe->pipe_lock); active = knote_process(kn, kev); rw_exit_write(rpipe->pipe_lock); return (active); } void pipe_init(void) { pool_init(&pipe_pair_pool, sizeof(struct pipe_pair), 0, IPL_MPFLOOR, PR_WAITOK, "pipepl", NULL); } struct pipe_pair * pipe_pair_create(void) { struct pipe_pair *pp; pp = pool_get(&pipe_pair_pool, PR_WAITOK | PR_ZERO); pp->pp_wpipe.pipe_pair = pp; pp->pp_rpipe.pipe_pair = pp; pp->pp_wpipe.pipe_peer = &pp->pp_rpipe; pp->pp_rpipe.pipe_peer = &pp->pp_wpipe; /* * One lock is used per pipe pair in order to obtain exclusive access to * the pipe pair. */ rw_init(&pp->pp_lock, "pipelk"); pp->pp_wpipe.pipe_lock = &pp->pp_lock; pp->pp_rpipe.pipe_lock = &pp->pp_lock; klist_init_rwlock(&pp->pp_wpipe.pipe_klist, &pp->pp_lock); klist_init_rwlock(&pp->pp_rpipe.pipe_klist, &pp->pp_lock); if (pipe_create(&pp->pp_wpipe) || pipe_create(&pp->pp_rpipe)) goto err; return (pp); err: pipe_destroy(&pp->pp_wpipe); pipe_destroy(&pp->pp_rpipe); return (NULL); } void pipe_pair_destroy(struct pipe_pair *pp) { klist_free(&pp->pp_wpipe.pipe_klist); klist_free(&pp->pp_rpipe.pipe_klist); pool_put(&pipe_pair_pool, pp); }