summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sys/conf/files3
-rw-r--r--sys/kern/init_main.c5
-rw-r--r--sys/kern/kern_task.c258
-rw-r--r--sys/sys/task.h43
4 files changed, 307 insertions, 2 deletions
diff --git a/sys/conf/files b/sys/conf/files
index 01618e112c9..20a27803d57 100644
--- a/sys/conf/files
+++ b/sys/conf/files
@@ -1,4 +1,4 @@
-# $OpenBSD: files,v 1.560 2013/10/28 12:33:32 mpi Exp $
+# $OpenBSD: files,v 1.561 2013/10/29 04:23:16 dlg Exp $
# $NetBSD: files,v 1.87 1996/05/19 17:17:50 jonathan Exp $
# @(#)files.newconf 7.5 (Berkeley) 5/10/93
@@ -681,6 +681,7 @@ file kern/kern_time.c
file kern/kern_timeout.c
file kern/kern_watchdog.c !small_kernel
file kern/kern_workq.c
+file kern/kern_task.c
file kern/kern_xxx.c
file kern/kgdb_stub.c kgdb
file kern/sched_bsd.c
diff --git a/sys/kern/init_main.c b/sys/kern/init_main.c
index b90b4c8a844..043edce4de5 100644
--- a/sys/kern/init_main.c
+++ b/sys/kern/init_main.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: init_main.c,v 1.190 2013/08/26 01:59:22 dlg Exp $ */
+/* $OpenBSD: init_main.c,v 1.191 2013/10/29 04:23:16 dlg Exp $ */
/* $NetBSD: init_main.c,v 1.84.4.1 1996/06/02 09:08:06 mrg Exp $ */
/*
@@ -75,6 +75,7 @@
#include <sys/mbuf.h>
#include <sys/pipe.h>
#include <sys/workq.h>
+#include <sys/task.h>
#include <sys/syscall.h>
#include <sys/syscallargs.h>
@@ -148,6 +149,7 @@ void crypto_init(void);
void init_exec(void);
void kqueue_init(void);
void workq_init(void);
+void taskq_init(void);
extern char sigcode[], esigcode[];
#ifdef SYSCALL_DEBUG
@@ -340,6 +342,7 @@ main(void *framep)
/* Initialize work queues */
workq_init();
+ taskq_init();
random_start();
diff --git a/sys/kern/kern_task.c b/sys/kern/kern_task.c
new file mode 100644
index 00000000000..f4d18b54b55
--- /dev/null
+++ b/sys/kern/kern_task.c
@@ -0,0 +1,258 @@
+/* $OpenBSD: kern_task.c,v 1.1 2013/10/29 04:23:16 dlg Exp $ */
+
+/*
+ * Copyright (c) 2013 David Gwynne <dlg@openbsd.org>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <sys/param.h>
+#include <sys/systm.h>
+#include <sys/malloc.h>
+#include <sys/pool.h>
+#include <sys/queue.h>
+#include <sys/mutex.h>
+#include <sys/kthread.h>
+#include <sys/task.h>
+
+#define TASK_ONQUEUE 1
+
+struct taskq {
+ enum {
+ TQ_S_CREATED,
+ TQ_S_RUNNING,
+ TQ_S_DESTROYED
+ } tq_state;
+ u_int tq_running;
+ u_int tq_nthreads;
+ const char *tq_name;
+
+ struct mutex tq_mtx;
+ TAILQ_HEAD(, task) tq_worklist;
+};
+
+struct taskq taskq_sys = {
+ TQ_S_CREATED,
+ 0,
+ 1,
+ "systq",
+ MUTEX_INITIALIZER(IPL_HIGH),
+ TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist)
+};
+
+void taskq_init(void); /* called in init_main.c */
+void taskq_create_thread(void *);
+int taskq_next_work(struct taskq *, struct task *);
+void taskq_thread(void *);
+
+void
+taskq_init(void)
+{
+ kthread_create_deferred(taskq_create_thread, &taskq_sys);
+}
+
+struct taskq *
+taskq_systq(void)
+{
+ return (&taskq_sys);
+}
+
+struct taskq *
+taskq_create(const char *name, u_int nthreads, int ipl)
+{
+ struct taskq *tq;
+
+ tq = malloc(sizeof(*tq), M_DEVBUF, M_NOWAIT);
+ if (tq == NULL)
+ return (NULL);
+
+ tq->tq_state = TQ_S_CREATED;
+ tq->tq_running = 0;
+ tq->tq_nthreads = nthreads;
+ tq->tq_name = name;
+
+ mtx_init(&tq->tq_mtx, ipl);
+ TAILQ_INIT(&tq->tq_worklist);
+
+ /* try to create a thread to guarantee that tasks will be serviced */
+ kthread_create_deferred(taskq_create_thread, tq);
+
+ return (tq);
+}
+
+void
+taskq_destroy(struct taskq *tq)
+{
+ mtx_enter(&tq->tq_mtx);
+ switch (tq->tq_state) {
+ case TQ_S_CREATED:
+ /* tq is still referenced by taskq_create_thread */
+ tq->tq_state = TQ_S_DESTROYED;
+ mtx_leave(&tq->tq_mtx);
+ return;
+
+ case TQ_S_RUNNING:
+ tq->tq_state = TQ_S_DESTROYED;
+ break;
+
+ default:
+ panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state);
+ }
+
+ while (tq->tq_running > 0) {
+ wakeup(tq);
+ msleep(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy", 0);
+ }
+ mtx_leave(&tq->tq_mtx);
+
+ free(tq, M_DEVBUF);
+}
+
+void
+taskq_create_thread(void *arg)
+{
+ struct taskq *tq = arg;
+ int rv;
+
+ mtx_enter(&tq->tq_mtx);
+
+ switch (tq->tq_state) {
+ case TQ_S_DESTROYED:
+ mtx_leave(&tq->tq_mtx);
+ free(tq, M_DEVBUF);
+ return;
+
+ case TQ_S_CREATED:
+ tq->tq_state = TQ_S_RUNNING;
+ break;
+
+ default:
+ panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state);
+ }
+
+ do {
+ tq->tq_running++;
+ mtx_leave(&tq->tq_mtx);
+
+ rv = kthread_create(taskq_thread, tq, NULL, "%s", tq->tq_name);
+
+ mtx_enter(&tq->tq_mtx);
+ if (rv != 0) {
+ printf("unable to create thread for \"%s\" taskq\n",
+ tq->tq_name);
+
+ tq->tq_running--;
+ /* could have been destroyed during kthread_create */
+ if (tq->tq_state == TQ_S_DESTROYED &&
+ tq->tq_running == 0)
+ wakeup_one(&tq->tq_running);
+ break;
+ }
+ } while (tq->tq_running < tq->tq_nthreads);
+
+ mtx_leave(&tq->tq_mtx);
+}
+
+void
+task_set(struct task *t, void (*fn)(void *, void *), void *arg1, void *arg2)
+{
+ t->t_func = fn;
+ t->t_arg1 = arg1;
+ t->t_arg2 = arg2;
+
+ t->t_flags = 0;
+}
+
+int
+task_add(struct taskq *tq, struct task *w)
+{
+ int rv = 0;
+
+ mtx_enter(&tq->tq_mtx);
+ if (!ISSET(w->t_flags, TASK_ONQUEUE)) {
+ rv = 1;
+ SET(w->t_flags, TASK_ONQUEUE);
+ TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry);
+ }
+ mtx_leave(&tq->tq_mtx);
+
+ if (rv)
+ wakeup_one(tq);
+
+ return (rv);
+}
+
+int
+task_del(struct taskq *tq, struct task *w)
+{
+ int rv = 0;
+
+ mtx_enter(&tq->tq_mtx);
+ if (ISSET(w->t_flags, TASK_ONQUEUE)) {
+ rv = 1;
+ CLR(w->t_flags, TASK_ONQUEUE);
+ TAILQ_REMOVE(&tq->tq_worklist, w, t_entry);
+ }
+ mtx_leave(&tq->tq_mtx);
+
+ return (rv);
+}
+
+int
+taskq_next_work(struct taskq *tq, struct task *work)
+{
+ struct task *next;
+
+ mtx_enter(&tq->tq_mtx);
+ while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) {
+ if (tq->tq_state != TQ_S_RUNNING) {
+ mtx_leave(&tq->tq_mtx);
+ return (0);
+ }
+
+ msleep(tq, &tq->tq_mtx, PWAIT, "bored", 0);
+ }
+
+ TAILQ_REMOVE(&tq->tq_worklist, next, t_entry);
+ CLR(next->t_flags, TASK_ONQUEUE);
+
+ *work = *next; /* copy to caller to avoid races */
+
+ next = TAILQ_FIRST(&tq->tq_worklist);
+ mtx_leave(&tq->tq_mtx);
+
+ if (next != NULL)
+ wakeup_one(tq);
+
+ return (1);
+}
+
+void
+taskq_thread(void *xtq)
+{
+ struct taskq *tq = xtq;
+ struct task work;
+ int last;
+
+ while (taskq_next_work(tq, &work))
+ (*work.t_func)(work.t_arg1, work.t_arg2);
+
+ mtx_enter(&tq->tq_mtx);
+ last = (--tq->tq_running == 0);
+ mtx_leave(&tq->tq_mtx);
+
+ if (last)
+ wakeup_one(&tq->tq_running);
+
+ kthread_exit(0);
+}
diff --git a/sys/sys/task.h b/sys/sys/task.h
new file mode 100644
index 00000000000..8a716230c48
--- /dev/null
+++ b/sys/sys/task.h
@@ -0,0 +1,43 @@
+/* $OpenBSD: task.h,v 1.1 2013/10/29 04:23:16 dlg Exp $ */
+
+/*
+ * Copyright (c) 2013 David Gwynne <dlg@openbsd.org>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#ifndef _SYS_TASKQ_H_
+#define _SYS_TASKQ_H_
+
+#include <sys/queue.h>
+
+struct taskq;
+
+struct task {
+ TAILQ_ENTRY(task) t_entry;
+ void (*t_func)(void *, void *);
+ void *t_arg1;
+ void *t_arg2;
+ u_int t_flags;
+};
+
+struct taskq *taskq_systq(void);
+struct taskq *taskq_create(const char *, u_int, int);
+void taskq_destroy(struct taskq *);
+
+void task_set(struct task *, void (*)(void *, void *),
+ void *, void *);
+int task_add(struct taskq *, struct task *);
+int task_del(struct taskq *, struct task *);
+
+#endif /* _SYS_TASKQ_H_ */