diff options
author | David Gwynne <dlg@cvs.openbsd.org> | 2013-10-29 04:23:17 +0000 |
---|---|---|
committer | David Gwynne <dlg@cvs.openbsd.org> | 2013-10-29 04:23:17 +0000 |
commit | c9ef89bbff80978bd7d7dc9a9bdbc1c5ed22d27e (patch) | |
tree | a5b61a863575bb6b764c458ec0f934f1a91bd034 | |
parent | 7adada48c656fd8168ea74214501643ef59f5cad (diff) |
introduce tasks and taskqs as an alternative to workqs.
tasks are modelled on the timeout api, so users familiar with
timeout_set, timeout_add, and timeout_del will already know what
to expect from task_set, task_add, and task_del.
i wrote this because workq_add_task can fail in the place you
actually need it, and there arent any good ways of recovering at
that point. workq_queue_task was added to try and help, but required
external state to be stored for users of that api to know whether
something was already queued or not.
workqs also didnt provide a way to cancel or remove work.
this has been percolating with a bunch of people. putting it in as i
wrote it so i can apply their feedback to the code with the history kept
in cvs.
-rw-r--r-- | sys/conf/files | 3 | ||||
-rw-r--r-- | sys/kern/init_main.c | 5 | ||||
-rw-r--r-- | sys/kern/kern_task.c | 258 | ||||
-rw-r--r-- | sys/sys/task.h | 43 |
4 files changed, 307 insertions, 2 deletions
diff --git a/sys/conf/files b/sys/conf/files index 01618e112c9..20a27803d57 100644 --- a/sys/conf/files +++ b/sys/conf/files @@ -1,4 +1,4 @@ -# $OpenBSD: files,v 1.560 2013/10/28 12:33:32 mpi Exp $ +# $OpenBSD: files,v 1.561 2013/10/29 04:23:16 dlg Exp $ # $NetBSD: files,v 1.87 1996/05/19 17:17:50 jonathan Exp $ # @(#)files.newconf 7.5 (Berkeley) 5/10/93 @@ -681,6 +681,7 @@ file kern/kern_time.c file kern/kern_timeout.c file kern/kern_watchdog.c !small_kernel file kern/kern_workq.c +file kern/kern_task.c file kern/kern_xxx.c file kern/kgdb_stub.c kgdb file kern/sched_bsd.c diff --git a/sys/kern/init_main.c b/sys/kern/init_main.c index b90b4c8a844..043edce4de5 100644 --- a/sys/kern/init_main.c +++ b/sys/kern/init_main.c @@ -1,4 +1,4 @@ -/* $OpenBSD: init_main.c,v 1.190 2013/08/26 01:59:22 dlg Exp $ */ +/* $OpenBSD: init_main.c,v 1.191 2013/10/29 04:23:16 dlg Exp $ */ /* $NetBSD: init_main.c,v 1.84.4.1 1996/06/02 09:08:06 mrg Exp $ */ /* @@ -75,6 +75,7 @@ #include <sys/mbuf.h> #include <sys/pipe.h> #include <sys/workq.h> +#include <sys/task.h> #include <sys/syscall.h> #include <sys/syscallargs.h> @@ -148,6 +149,7 @@ void crypto_init(void); void init_exec(void); void kqueue_init(void); void workq_init(void); +void taskq_init(void); extern char sigcode[], esigcode[]; #ifdef SYSCALL_DEBUG @@ -340,6 +342,7 @@ main(void *framep) /* Initialize work queues */ workq_init(); + taskq_init(); random_start(); diff --git a/sys/kern/kern_task.c b/sys/kern/kern_task.c new file mode 100644 index 00000000000..f4d18b54b55 --- /dev/null +++ b/sys/kern/kern_task.c @@ -0,0 +1,258 @@ +/* $OpenBSD: kern_task.c,v 1.1 2013/10/29 04:23:16 dlg Exp $ */ + +/* + * Copyright (c) 2013 David Gwynne <dlg@openbsd.org> + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include <sys/param.h> +#include <sys/systm.h> +#include <sys/malloc.h> +#include <sys/pool.h> +#include <sys/queue.h> +#include <sys/mutex.h> +#include <sys/kthread.h> +#include <sys/task.h> + +#define TASK_ONQUEUE 1 + +struct taskq { + enum { + TQ_S_CREATED, + TQ_S_RUNNING, + TQ_S_DESTROYED + } tq_state; + u_int tq_running; + u_int tq_nthreads; + const char *tq_name; + + struct mutex tq_mtx; + TAILQ_HEAD(, task) tq_worklist; +}; + +struct taskq taskq_sys = { + TQ_S_CREATED, + 0, + 1, + "systq", + MUTEX_INITIALIZER(IPL_HIGH), + TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist) +}; + +void taskq_init(void); /* called in init_main.c */ +void taskq_create_thread(void *); +int taskq_next_work(struct taskq *, struct task *); +void taskq_thread(void *); + +void +taskq_init(void) +{ + kthread_create_deferred(taskq_create_thread, &taskq_sys); +} + +struct taskq * +taskq_systq(void) +{ + return (&taskq_sys); +} + +struct taskq * +taskq_create(const char *name, u_int nthreads, int ipl) +{ + struct taskq *tq; + + tq = malloc(sizeof(*tq), M_DEVBUF, M_NOWAIT); + if (tq == NULL) + return (NULL); + + tq->tq_state = TQ_S_CREATED; + tq->tq_running = 0; + tq->tq_nthreads = nthreads; + tq->tq_name = name; + + mtx_init(&tq->tq_mtx, ipl); + TAILQ_INIT(&tq->tq_worklist); + + /* try to create a thread to guarantee that tasks will be serviced */ + kthread_create_deferred(taskq_create_thread, tq); + + return (tq); +} + +void +taskq_destroy(struct taskq *tq) +{ + mtx_enter(&tq->tq_mtx); + switch (tq->tq_state) { + case TQ_S_CREATED: + /* tq is still referenced by taskq_create_thread */ + tq->tq_state = TQ_S_DESTROYED; + mtx_leave(&tq->tq_mtx); + return; + + case TQ_S_RUNNING: + tq->tq_state = TQ_S_DESTROYED; + break; + + default: + panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state); + } + + while (tq->tq_running > 0) { + wakeup(tq); + msleep(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy", 0); + } + mtx_leave(&tq->tq_mtx); + + free(tq, M_DEVBUF); +} + +void +taskq_create_thread(void *arg) +{ + struct taskq *tq = arg; + int rv; + + mtx_enter(&tq->tq_mtx); + + switch (tq->tq_state) { + case TQ_S_DESTROYED: + mtx_leave(&tq->tq_mtx); + free(tq, M_DEVBUF); + return; + + case TQ_S_CREATED: + tq->tq_state = TQ_S_RUNNING; + break; + + default: + panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state); + } + + do { + tq->tq_running++; + mtx_leave(&tq->tq_mtx); + + rv = kthread_create(taskq_thread, tq, NULL, "%s", tq->tq_name); + + mtx_enter(&tq->tq_mtx); + if (rv != 0) { + printf("unable to create thread for \"%s\" taskq\n", + tq->tq_name); + + tq->tq_running--; + /* could have been destroyed during kthread_create */ + if (tq->tq_state == TQ_S_DESTROYED && + tq->tq_running == 0) + wakeup_one(&tq->tq_running); + break; + } + } while (tq->tq_running < tq->tq_nthreads); + + mtx_leave(&tq->tq_mtx); +} + +void +task_set(struct task *t, void (*fn)(void *, void *), void *arg1, void *arg2) +{ + t->t_func = fn; + t->t_arg1 = arg1; + t->t_arg2 = arg2; + + t->t_flags = 0; +} + +int +task_add(struct taskq *tq, struct task *w) +{ + int rv = 0; + + mtx_enter(&tq->tq_mtx); + if (!ISSET(w->t_flags, TASK_ONQUEUE)) { + rv = 1; + SET(w->t_flags, TASK_ONQUEUE); + TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry); + } + mtx_leave(&tq->tq_mtx); + + if (rv) + wakeup_one(tq); + + return (rv); +} + +int +task_del(struct taskq *tq, struct task *w) +{ + int rv = 0; + + mtx_enter(&tq->tq_mtx); + if (ISSET(w->t_flags, TASK_ONQUEUE)) { + rv = 1; + CLR(w->t_flags, TASK_ONQUEUE); + TAILQ_REMOVE(&tq->tq_worklist, w, t_entry); + } + mtx_leave(&tq->tq_mtx); + + return (rv); +} + +int +taskq_next_work(struct taskq *tq, struct task *work) +{ + struct task *next; + + mtx_enter(&tq->tq_mtx); + while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) { + if (tq->tq_state != TQ_S_RUNNING) { + mtx_leave(&tq->tq_mtx); + return (0); + } + + msleep(tq, &tq->tq_mtx, PWAIT, "bored", 0); + } + + TAILQ_REMOVE(&tq->tq_worklist, next, t_entry); + CLR(next->t_flags, TASK_ONQUEUE); + + *work = *next; /* copy to caller to avoid races */ + + next = TAILQ_FIRST(&tq->tq_worklist); + mtx_leave(&tq->tq_mtx); + + if (next != NULL) + wakeup_one(tq); + + return (1); +} + +void +taskq_thread(void *xtq) +{ + struct taskq *tq = xtq; + struct task work; + int last; + + while (taskq_next_work(tq, &work)) + (*work.t_func)(work.t_arg1, work.t_arg2); + + mtx_enter(&tq->tq_mtx); + last = (--tq->tq_running == 0); + mtx_leave(&tq->tq_mtx); + + if (last) + wakeup_one(&tq->tq_running); + + kthread_exit(0); +} diff --git a/sys/sys/task.h b/sys/sys/task.h new file mode 100644 index 00000000000..8a716230c48 --- /dev/null +++ b/sys/sys/task.h @@ -0,0 +1,43 @@ +/* $OpenBSD: task.h,v 1.1 2013/10/29 04:23:16 dlg Exp $ */ + +/* + * Copyright (c) 2013 David Gwynne <dlg@openbsd.org> + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#ifndef _SYS_TASKQ_H_ +#define _SYS_TASKQ_H_ + +#include <sys/queue.h> + +struct taskq; + +struct task { + TAILQ_ENTRY(task) t_entry; + void (*t_func)(void *, void *); + void *t_arg1; + void *t_arg2; + u_int t_flags; +}; + +struct taskq *taskq_systq(void); +struct taskq *taskq_create(const char *, u_int, int); +void taskq_destroy(struct taskq *); + +void task_set(struct task *, void (*)(void *, void *), + void *, void *); +int task_add(struct taskq *, struct task *); +int task_del(struct taskq *, struct task *); + +#endif /* _SYS_TASKQ_H_ */ |