diff options
-rw-r--r-- | sys/conf/files | 3 | ||||
-rw-r--r-- | sys/kern/init_main.c | 5 | ||||
-rw-r--r-- | sys/kern/kern_task.c | 258 | ||||
-rw-r--r-- | sys/sys/task.h | 43 |
4 files changed, 307 insertions, 2 deletions
diff --git a/sys/conf/files b/sys/conf/files index 01618e112c9..20a27803d57 100644 --- a/sys/conf/files +++ b/sys/conf/files @@ -1,4 +1,4 @@ -# $OpenBSD: files,v 1.560 2013/10/28 12:33:32 mpi Exp $ +# $OpenBSD: files,v 1.561 2013/10/29 04:23:16 dlg Exp $ # $NetBSD: files,v 1.87 1996/05/19 17:17:50 jonathan Exp $ # @(#)files.newconf 7.5 (Berkeley) 5/10/93 @@ -681,6 +681,7 @@ file kern/kern_time.c file kern/kern_timeout.c file kern/kern_watchdog.c !small_kernel file kern/kern_workq.c +file kern/kern_task.c file kern/kern_xxx.c file kern/kgdb_stub.c kgdb file kern/sched_bsd.c diff --git a/sys/kern/init_main.c b/sys/kern/init_main.c index b90b4c8a844..043edce4de5 100644 --- a/sys/kern/init_main.c +++ b/sys/kern/init_main.c @@ -1,4 +1,4 @@ -/* $OpenBSD: init_main.c,v 1.190 2013/08/26 01:59:22 dlg Exp $ */ +/* $OpenBSD: init_main.c,v 1.191 2013/10/29 04:23:16 dlg Exp $ */ /* $NetBSD: init_main.c,v 1.84.4.1 1996/06/02 09:08:06 mrg Exp $ */ /* @@ -75,6 +75,7 @@ #include <sys/mbuf.h> #include <sys/pipe.h> #include <sys/workq.h> +#include <sys/task.h> #include <sys/syscall.h> #include <sys/syscallargs.h> @@ -148,6 +149,7 @@ void crypto_init(void); void init_exec(void); void kqueue_init(void); void workq_init(void); +void taskq_init(void); extern char sigcode[], esigcode[]; #ifdef SYSCALL_DEBUG @@ -340,6 +342,7 @@ main(void *framep) /* Initialize work queues */ workq_init(); + taskq_init(); random_start(); diff --git a/sys/kern/kern_task.c b/sys/kern/kern_task.c new file mode 100644 index 00000000000..f4d18b54b55 --- /dev/null +++ b/sys/kern/kern_task.c @@ -0,0 +1,258 @@ +/* $OpenBSD: kern_task.c,v 1.1 2013/10/29 04:23:16 dlg Exp $ */ + +/* + * Copyright (c) 2013 David Gwynne <dlg@openbsd.org> + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include <sys/param.h> +#include <sys/systm.h> +#include <sys/malloc.h> +#include <sys/pool.h> +#include <sys/queue.h> +#include <sys/mutex.h> +#include <sys/kthread.h> +#include <sys/task.h> + +#define TASK_ONQUEUE 1 + +struct taskq { + enum { + TQ_S_CREATED, + TQ_S_RUNNING, + TQ_S_DESTROYED + } tq_state; + u_int tq_running; + u_int tq_nthreads; + const char *tq_name; + + struct mutex tq_mtx; + TAILQ_HEAD(, task) tq_worklist; +}; + +struct taskq taskq_sys = { + TQ_S_CREATED, + 0, + 1, + "systq", + MUTEX_INITIALIZER(IPL_HIGH), + TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist) +}; + +void taskq_init(void); /* called in init_main.c */ +void taskq_create_thread(void *); +int taskq_next_work(struct taskq *, struct task *); +void taskq_thread(void *); + +void +taskq_init(void) +{ + kthread_create_deferred(taskq_create_thread, &taskq_sys); +} + +struct taskq * +taskq_systq(void) +{ + return (&taskq_sys); +} + +struct taskq * +taskq_create(const char *name, u_int nthreads, int ipl) +{ + struct taskq *tq; + + tq = malloc(sizeof(*tq), M_DEVBUF, M_NOWAIT); + if (tq == NULL) + return (NULL); + + tq->tq_state = TQ_S_CREATED; + tq->tq_running = 0; + tq->tq_nthreads = nthreads; + tq->tq_name = name; + + mtx_init(&tq->tq_mtx, ipl); + TAILQ_INIT(&tq->tq_worklist); + + /* try to create a thread to guarantee that tasks will be serviced */ + kthread_create_deferred(taskq_create_thread, tq); + + return (tq); +} + +void +taskq_destroy(struct taskq *tq) +{ + mtx_enter(&tq->tq_mtx); + switch (tq->tq_state) { + case TQ_S_CREATED: + /* tq is still referenced by taskq_create_thread */ + tq->tq_state = TQ_S_DESTROYED; + mtx_leave(&tq->tq_mtx); + return; + + case TQ_S_RUNNING: + tq->tq_state = TQ_S_DESTROYED; + break; + + default: + panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state); + } + + while (tq->tq_running > 0) { + wakeup(tq); + msleep(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy", 0); + } + mtx_leave(&tq->tq_mtx); + + free(tq, M_DEVBUF); +} + +void +taskq_create_thread(void *arg) +{ + struct taskq *tq = arg; + int rv; + + mtx_enter(&tq->tq_mtx); + + switch (tq->tq_state) { + case TQ_S_DESTROYED: + mtx_leave(&tq->tq_mtx); + free(tq, M_DEVBUF); + return; + + case TQ_S_CREATED: + tq->tq_state = TQ_S_RUNNING; + break; + + default: + panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state); + } + + do { + tq->tq_running++; + mtx_leave(&tq->tq_mtx); + + rv = kthread_create(taskq_thread, tq, NULL, "%s", tq->tq_name); + + mtx_enter(&tq->tq_mtx); + if (rv != 0) { + printf("unable to create thread for \"%s\" taskq\n", + tq->tq_name); + + tq->tq_running--; + /* could have been destroyed during kthread_create */ + if (tq->tq_state == TQ_S_DESTROYED && + tq->tq_running == 0) + wakeup_one(&tq->tq_running); + break; + } + } while (tq->tq_running < tq->tq_nthreads); + + mtx_leave(&tq->tq_mtx); +} + +void +task_set(struct task *t, void (*fn)(void *, void *), void *arg1, void *arg2) +{ + t->t_func = fn; + t->t_arg1 = arg1; + t->t_arg2 = arg2; + + t->t_flags = 0; +} + +int +task_add(struct taskq *tq, struct task *w) +{ + int rv = 0; + + mtx_enter(&tq->tq_mtx); + if (!ISSET(w->t_flags, TASK_ONQUEUE)) { + rv = 1; + SET(w->t_flags, TASK_ONQUEUE); + TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry); + } + mtx_leave(&tq->tq_mtx); + + if (rv) + wakeup_one(tq); + + return (rv); +} + +int +task_del(struct taskq *tq, struct task *w) +{ + int rv = 0; + + mtx_enter(&tq->tq_mtx); + if (ISSET(w->t_flags, TASK_ONQUEUE)) { + rv = 1; + CLR(w->t_flags, TASK_ONQUEUE); + TAILQ_REMOVE(&tq->tq_worklist, w, t_entry); + } + mtx_leave(&tq->tq_mtx); + + return (rv); +} + +int +taskq_next_work(struct taskq *tq, struct task *work) +{ + struct task *next; + + mtx_enter(&tq->tq_mtx); + while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) { + if (tq->tq_state != TQ_S_RUNNING) { + mtx_leave(&tq->tq_mtx); + return (0); + } + + msleep(tq, &tq->tq_mtx, PWAIT, "bored", 0); + } + + TAILQ_REMOVE(&tq->tq_worklist, next, t_entry); + CLR(next->t_flags, TASK_ONQUEUE); + + *work = *next; /* copy to caller to avoid races */ + + next = TAILQ_FIRST(&tq->tq_worklist); + mtx_leave(&tq->tq_mtx); + + if (next != NULL) + wakeup_one(tq); + + return (1); +} + +void +taskq_thread(void *xtq) +{ + struct taskq *tq = xtq; + struct task work; + int last; + + while (taskq_next_work(tq, &work)) + (*work.t_func)(work.t_arg1, work.t_arg2); + + mtx_enter(&tq->tq_mtx); + last = (--tq->tq_running == 0); + mtx_leave(&tq->tq_mtx); + + if (last) + wakeup_one(&tq->tq_running); + + kthread_exit(0); +} diff --git a/sys/sys/task.h b/sys/sys/task.h new file mode 100644 index 00000000000..8a716230c48 --- /dev/null +++ b/sys/sys/task.h @@ -0,0 +1,43 @@ +/* $OpenBSD: task.h,v 1.1 2013/10/29 04:23:16 dlg Exp $ */ + +/* + * Copyright (c) 2013 David Gwynne <dlg@openbsd.org> + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#ifndef _SYS_TASKQ_H_ +#define _SYS_TASKQ_H_ + +#include <sys/queue.h> + +struct taskq; + +struct task { + TAILQ_ENTRY(task) t_entry; + void (*t_func)(void *, void *); + void *t_arg1; + void *t_arg2; + u_int t_flags; +}; + +struct taskq *taskq_systq(void); +struct taskq *taskq_create(const char *, u_int, int); +void taskq_destroy(struct taskq *); + +void task_set(struct task *, void (*)(void *, void *), + void *, void *); +int task_add(struct taskq *, struct task *); +int task_del(struct taskq *, struct task *); + +#endif /* _SYS_TASKQ_H_ */ |