summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Gwynne <dlg@cvs.openbsd.org>2013-10-29 04:23:17 +0000
committerDavid Gwynne <dlg@cvs.openbsd.org>2013-10-29 04:23:17 +0000
commitc9ef89bbff80978bd7d7dc9a9bdbc1c5ed22d27e (patch)
treea5b61a863575bb6b764c458ec0f934f1a91bd034
parent7adada48c656fd8168ea74214501643ef59f5cad (diff)
introduce tasks and taskqs as an alternative to workqs.
tasks are modelled on the timeout api, so users familiar with timeout_set, timeout_add, and timeout_del will already know what to expect from task_set, task_add, and task_del. i wrote this because workq_add_task can fail in the place you actually need it, and there arent any good ways of recovering at that point. workq_queue_task was added to try and help, but required external state to be stored for users of that api to know whether something was already queued or not. workqs also didnt provide a way to cancel or remove work. this has been percolating with a bunch of people. putting it in as i wrote it so i can apply their feedback to the code with the history kept in cvs.
-rw-r--r--sys/conf/files3
-rw-r--r--sys/kern/init_main.c5
-rw-r--r--sys/kern/kern_task.c258
-rw-r--r--sys/sys/task.h43
4 files changed, 307 insertions, 2 deletions
diff --git a/sys/conf/files b/sys/conf/files
index 01618e112c9..20a27803d57 100644
--- a/sys/conf/files
+++ b/sys/conf/files
@@ -1,4 +1,4 @@
-# $OpenBSD: files,v 1.560 2013/10/28 12:33:32 mpi Exp $
+# $OpenBSD: files,v 1.561 2013/10/29 04:23:16 dlg Exp $
# $NetBSD: files,v 1.87 1996/05/19 17:17:50 jonathan Exp $
# @(#)files.newconf 7.5 (Berkeley) 5/10/93
@@ -681,6 +681,7 @@ file kern/kern_time.c
file kern/kern_timeout.c
file kern/kern_watchdog.c !small_kernel
file kern/kern_workq.c
+file kern/kern_task.c
file kern/kern_xxx.c
file kern/kgdb_stub.c kgdb
file kern/sched_bsd.c
diff --git a/sys/kern/init_main.c b/sys/kern/init_main.c
index b90b4c8a844..043edce4de5 100644
--- a/sys/kern/init_main.c
+++ b/sys/kern/init_main.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: init_main.c,v 1.190 2013/08/26 01:59:22 dlg Exp $ */
+/* $OpenBSD: init_main.c,v 1.191 2013/10/29 04:23:16 dlg Exp $ */
/* $NetBSD: init_main.c,v 1.84.4.1 1996/06/02 09:08:06 mrg Exp $ */
/*
@@ -75,6 +75,7 @@
#include <sys/mbuf.h>
#include <sys/pipe.h>
#include <sys/workq.h>
+#include <sys/task.h>
#include <sys/syscall.h>
#include <sys/syscallargs.h>
@@ -148,6 +149,7 @@ void crypto_init(void);
void init_exec(void);
void kqueue_init(void);
void workq_init(void);
+void taskq_init(void);
extern char sigcode[], esigcode[];
#ifdef SYSCALL_DEBUG
@@ -340,6 +342,7 @@ main(void *framep)
/* Initialize work queues */
workq_init();
+ taskq_init();
random_start();
diff --git a/sys/kern/kern_task.c b/sys/kern/kern_task.c
new file mode 100644
index 00000000000..f4d18b54b55
--- /dev/null
+++ b/sys/kern/kern_task.c
@@ -0,0 +1,258 @@
+/* $OpenBSD: kern_task.c,v 1.1 2013/10/29 04:23:16 dlg Exp $ */
+
+/*
+ * Copyright (c) 2013 David Gwynne <dlg@openbsd.org>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <sys/param.h>
+#include <sys/systm.h>
+#include <sys/malloc.h>
+#include <sys/pool.h>
+#include <sys/queue.h>
+#include <sys/mutex.h>
+#include <sys/kthread.h>
+#include <sys/task.h>
+
+#define TASK_ONQUEUE 1
+
+struct taskq {
+ enum {
+ TQ_S_CREATED,
+ TQ_S_RUNNING,
+ TQ_S_DESTROYED
+ } tq_state;
+ u_int tq_running;
+ u_int tq_nthreads;
+ const char *tq_name;
+
+ struct mutex tq_mtx;
+ TAILQ_HEAD(, task) tq_worklist;
+};
+
+struct taskq taskq_sys = {
+ TQ_S_CREATED,
+ 0,
+ 1,
+ "systq",
+ MUTEX_INITIALIZER(IPL_HIGH),
+ TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist)
+};
+
+void taskq_init(void); /* called in init_main.c */
+void taskq_create_thread(void *);
+int taskq_next_work(struct taskq *, struct task *);
+void taskq_thread(void *);
+
+void
+taskq_init(void)
+{
+ kthread_create_deferred(taskq_create_thread, &taskq_sys);
+}
+
+struct taskq *
+taskq_systq(void)
+{
+ return (&taskq_sys);
+}
+
+struct taskq *
+taskq_create(const char *name, u_int nthreads, int ipl)
+{
+ struct taskq *tq;
+
+ tq = malloc(sizeof(*tq), M_DEVBUF, M_NOWAIT);
+ if (tq == NULL)
+ return (NULL);
+
+ tq->tq_state = TQ_S_CREATED;
+ tq->tq_running = 0;
+ tq->tq_nthreads = nthreads;
+ tq->tq_name = name;
+
+ mtx_init(&tq->tq_mtx, ipl);
+ TAILQ_INIT(&tq->tq_worklist);
+
+ /* try to create a thread to guarantee that tasks will be serviced */
+ kthread_create_deferred(taskq_create_thread, tq);
+
+ return (tq);
+}
+
+void
+taskq_destroy(struct taskq *tq)
+{
+ mtx_enter(&tq->tq_mtx);
+ switch (tq->tq_state) {
+ case TQ_S_CREATED:
+ /* tq is still referenced by taskq_create_thread */
+ tq->tq_state = TQ_S_DESTROYED;
+ mtx_leave(&tq->tq_mtx);
+ return;
+
+ case TQ_S_RUNNING:
+ tq->tq_state = TQ_S_DESTROYED;
+ break;
+
+ default:
+ panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state);
+ }
+
+ while (tq->tq_running > 0) {
+ wakeup(tq);
+ msleep(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy", 0);
+ }
+ mtx_leave(&tq->tq_mtx);
+
+ free(tq, M_DEVBUF);
+}
+
+void
+taskq_create_thread(void *arg)
+{
+ struct taskq *tq = arg;
+ int rv;
+
+ mtx_enter(&tq->tq_mtx);
+
+ switch (tq->tq_state) {
+ case TQ_S_DESTROYED:
+ mtx_leave(&tq->tq_mtx);
+ free(tq, M_DEVBUF);
+ return;
+
+ case TQ_S_CREATED:
+ tq->tq_state = TQ_S_RUNNING;
+ break;
+
+ default:
+ panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state);
+ }
+
+ do {
+ tq->tq_running++;
+ mtx_leave(&tq->tq_mtx);
+
+ rv = kthread_create(taskq_thread, tq, NULL, "%s", tq->tq_name);
+
+ mtx_enter(&tq->tq_mtx);
+ if (rv != 0) {
+ printf("unable to create thread for \"%s\" taskq\n",
+ tq->tq_name);
+
+ tq->tq_running--;
+ /* could have been destroyed during kthread_create */
+ if (tq->tq_state == TQ_S_DESTROYED &&
+ tq->tq_running == 0)
+ wakeup_one(&tq->tq_running);
+ break;
+ }
+ } while (tq->tq_running < tq->tq_nthreads);
+
+ mtx_leave(&tq->tq_mtx);
+}
+
+void
+task_set(struct task *t, void (*fn)(void *, void *), void *arg1, void *arg2)
+{
+ t->t_func = fn;
+ t->t_arg1 = arg1;
+ t->t_arg2 = arg2;
+
+ t->t_flags = 0;
+}
+
+int
+task_add(struct taskq *tq, struct task *w)
+{
+ int rv = 0;
+
+ mtx_enter(&tq->tq_mtx);
+ if (!ISSET(w->t_flags, TASK_ONQUEUE)) {
+ rv = 1;
+ SET(w->t_flags, TASK_ONQUEUE);
+ TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry);
+ }
+ mtx_leave(&tq->tq_mtx);
+
+ if (rv)
+ wakeup_one(tq);
+
+ return (rv);
+}
+
+int
+task_del(struct taskq *tq, struct task *w)
+{
+ int rv = 0;
+
+ mtx_enter(&tq->tq_mtx);
+ if (ISSET(w->t_flags, TASK_ONQUEUE)) {
+ rv = 1;
+ CLR(w->t_flags, TASK_ONQUEUE);
+ TAILQ_REMOVE(&tq->tq_worklist, w, t_entry);
+ }
+ mtx_leave(&tq->tq_mtx);
+
+ return (rv);
+}
+
+int
+taskq_next_work(struct taskq *tq, struct task *work)
+{
+ struct task *next;
+
+ mtx_enter(&tq->tq_mtx);
+ while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) {
+ if (tq->tq_state != TQ_S_RUNNING) {
+ mtx_leave(&tq->tq_mtx);
+ return (0);
+ }
+
+ msleep(tq, &tq->tq_mtx, PWAIT, "bored", 0);
+ }
+
+ TAILQ_REMOVE(&tq->tq_worklist, next, t_entry);
+ CLR(next->t_flags, TASK_ONQUEUE);
+
+ *work = *next; /* copy to caller to avoid races */
+
+ next = TAILQ_FIRST(&tq->tq_worklist);
+ mtx_leave(&tq->tq_mtx);
+
+ if (next != NULL)
+ wakeup_one(tq);
+
+ return (1);
+}
+
+void
+taskq_thread(void *xtq)
+{
+ struct taskq *tq = xtq;
+ struct task work;
+ int last;
+
+ while (taskq_next_work(tq, &work))
+ (*work.t_func)(work.t_arg1, work.t_arg2);
+
+ mtx_enter(&tq->tq_mtx);
+ last = (--tq->tq_running == 0);
+ mtx_leave(&tq->tq_mtx);
+
+ if (last)
+ wakeup_one(&tq->tq_running);
+
+ kthread_exit(0);
+}
diff --git a/sys/sys/task.h b/sys/sys/task.h
new file mode 100644
index 00000000000..8a716230c48
--- /dev/null
+++ b/sys/sys/task.h
@@ -0,0 +1,43 @@
+/* $OpenBSD: task.h,v 1.1 2013/10/29 04:23:16 dlg Exp $ */
+
+/*
+ * Copyright (c) 2013 David Gwynne <dlg@openbsd.org>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#ifndef _SYS_TASKQ_H_
+#define _SYS_TASKQ_H_
+
+#include <sys/queue.h>
+
+struct taskq;
+
+struct task {
+ TAILQ_ENTRY(task) t_entry;
+ void (*t_func)(void *, void *);
+ void *t_arg1;
+ void *t_arg2;
+ u_int t_flags;
+};
+
+struct taskq *taskq_systq(void);
+struct taskq *taskq_create(const char *, u_int, int);
+void taskq_destroy(struct taskq *);
+
+void task_set(struct task *, void (*)(void *, void *),
+ void *, void *);
+int task_add(struct taskq *, struct task *);
+int task_del(struct taskq *, struct task *);
+
+#endif /* _SYS_TASKQ_H_ */