From b88384b256aade92f4b379501be34474b1901884 Mon Sep 17 00:00:00 2001 From: Martin Pieuchot Date: Sat, 4 Nov 2017 14:13:54 +0000 Subject: Make it possible for multiple threads to enter kqueue_scan() in parallel. This is a requirement to use a sleeping lock inside kqueue filters. It is now possible, but not recommended, to sleep inside ``f_event''. Threads iterating over the list of pending events are now recognizing and skipping other threads' markers. knote_acquire() and knote_release() must be used to "own" a knote to make sure no other thread is sleeping with a reference on it. Acquire and marker logic taken from DragonFly but the KERNEL_LOCK() is still serializing the execution of the kqueue code. This also enable the NET_LOCK() in socket filters. Tested by abieber@ & juanfra@, run by naddy@ in a bulk, ok visa@, bluhm@ --- sys/kern/kern_event.c | 61 ++++++++++++++++++++++++++++++++++++++++++++++++-- sys/kern/uipc_socket.c | 19 +++++++++++++--- 2 files changed, 75 insertions(+), 5 deletions(-) (limited to 'sys/kern') diff --git a/sys/kern/kern_event.c b/sys/kern/kern_event.c index e2de0aabbf8..b13feef56a4 100644 --- a/sys/kern/kern_event.c +++ b/sys/kern/kern_event.c @@ -1,4 +1,4 @@ -/* $OpenBSD: kern_event.c,v 1.81 2017/10/11 08:06:56 mpi Exp $ */ +/* $OpenBSD: kern_event.c,v 1.82 2017/11/04 14:13:53 mpi Exp $ */ /*- * Copyright (c) 1999,2000,2001 Jonathan Lemon @@ -84,6 +84,8 @@ void knote_attach(struct knote *kn, struct filedesc *fdp); void knote_drop(struct knote *kn, struct proc *p, struct filedesc *fdp); void knote_enqueue(struct knote *kn); void knote_dequeue(struct knote *kn); +int knote_acquire(struct knote *kn); +int knote_release(struct knote *kn); #define knote_alloc() ((struct knote *)pool_get(&knote_pool, PR_WAITOK)) #define knote_free(kn) pool_put(&knote_pool, (kn)) @@ -759,27 +761,43 @@ start: goto done; } + marker.kn_filter = EVFILT_MARKER; + marker.kn_status = KN_PROCESSING; TAILQ_INSERT_TAIL(&kq->kq_head, &marker, kn_tqe); while (count) { kn = TAILQ_FIRST(&kq->kq_head); if (kn == &marker) { - TAILQ_REMOVE(&kq->kq_head, kn, kn_tqe); + TAILQ_REMOVE(&kq->kq_head, &marker, kn_tqe); splx(s); if (count == maxevents) goto retry; goto done; } + if (kn->kn_filter == EVFILT_MARKER) { + struct knote *other_marker = kn; + + /* Move some other threads marker past this kn */ + kn = TAILQ_NEXT(other_marker, kn_tqe); + TAILQ_REMOVE(&kq->kq_head, kn, kn_tqe); + TAILQ_INSERT_BEFORE(other_marker, kn, kn_tqe); + continue; + } + + if (!knote_acquire(kn)) + continue; TAILQ_REMOVE(&kq->kq_head, kn, kn_tqe); kq->kq_count--; if (kn->kn_status & KN_DISABLED) { kn->kn_status &= ~KN_QUEUED; + knote_release(kn); continue; } if ((kn->kn_flags & EV_ONESHOT) == 0 && kn->kn_fop->f_event(kn, 0) == 0) { kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE); + knote_release(kn); continue; } *kevp = kn->kn_kevent; @@ -799,9 +817,11 @@ start: if (kn->kn_flags & EV_DISPATCH) kn->kn_status |= KN_DISABLED; kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE); + knote_release(kn); } else { TAILQ_INSERT_TAIL(&kq->kq_head, kn, kn_tqe); kq->kq_count++; + knote_release(kn); } count--; if (nkev == KQ_NEVENTS) { @@ -955,6 +975,41 @@ kqueue_wakeup(struct kqueue *kq) KNOTE(&kq->kq_sel.si_note, 0); } +/* + * Acquire a knote, return non-zero on success, 0 on failure. + * + * If we cannot acquire the knote we sleep and return 0. The knote + * may be stale on return in this case and the caller must restart + * whatever loop they are in. + */ +int +knote_acquire(struct knote *kn) +{ + if (kn->kn_status & KN_PROCESSING) { + kn->kn_status |= KN_WAITING; + tsleep(kn, 0, "kqepts", hz); + /* knote may be stale now */ + return (0); + } + kn->kn_status |= KN_PROCESSING; + return (1); +} + +/* + * Release an acquired knote, clearing KN_PROCESSING. + */ +int +knote_release(struct knote *kn) +{ + if (kn->kn_status & KN_WAITING) { + kn->kn_status &= ~KN_WAITING; + wakeup(kn); + } + kn->kn_status &= ~KN_PROCESSING; + /* kn should not be accessed anymore */ + return (0); +} + /* * activate one knote. */ @@ -986,6 +1041,8 @@ knote_remove(struct proc *p, struct klist *list) struct knote *kn; while ((kn = SLIST_FIRST(list)) != NULL) { + if (!knote_acquire(kn)) + continue; kn->kn_fop->f_detach(kn); knote_drop(kn, p, p->p_fd); } diff --git a/sys/kern/uipc_socket.c b/sys/kern/uipc_socket.c index ce240b62fe6..92efb46fd6e 100644 --- a/sys/kern/uipc_socket.c +++ b/sys/kern/uipc_socket.c @@ -1,4 +1,4 @@ -/* $OpenBSD: uipc_socket.c,v 1.206 2017/11/02 14:01:18 florian Exp $ */ +/* $OpenBSD: uipc_socket.c,v 1.207 2017/11/04 14:13:53 mpi Exp $ */ /* $NetBSD: uipc_socket.c,v 1.21 1996/02/04 02:17:52 christos Exp $ */ /* @@ -1922,8 +1922,10 @@ int filt_soread(struct knote *kn, long hint) { struct socket *so = kn->kn_fp->f_data; - int rv; + int s, rv; + if (!(hint & NOTE_SUBMIT)) + s = solock(so); kn->kn_data = so->so_rcv.sb_cc; #ifdef SOCKET_SPLICE if (isspliced(so)) { @@ -1941,6 +1943,8 @@ filt_soread(struct knote *kn, long hint) } else { rv = (kn->kn_data >= so->so_rcv.sb_lowat); } + if (!(hint & NOTE_SUBMIT)) + sounlock(s); return rv; } @@ -1961,8 +1965,10 @@ int filt_sowrite(struct knote *kn, long hint) { struct socket *so = kn->kn_fp->f_data; - int rv; + int s, rv; + if (!(hint & NOTE_SUBMIT)) + s = solock(so); kn->kn_data = sbspace(so, &so->so_snd); if (so->so_state & SS_CANTSENDMORE) { kn->kn_flags |= EV_EOF; @@ -1978,6 +1984,8 @@ filt_sowrite(struct knote *kn, long hint) } else { rv = (kn->kn_data >= so->so_snd.sb_lowat); } + if (!(hint & NOTE_SUBMIT)) + sounlock(s); return (rv); } @@ -1986,8 +1994,13 @@ int filt_solisten(struct knote *kn, long hint) { struct socket *so = kn->kn_fp->f_data; + int s; + if (!(hint & NOTE_SUBMIT)) + s = solock(so); kn->kn_data = so->so_qlen; + if (!(hint & NOTE_SUBMIT)) + sounlock(s); return (kn->kn_data != 0); } -- cgit v1.2.3