Line data Source code
1 : /* $OpenBSD: kern_task.c,v 1.22 2017/12/14 00:45:16 dlg Exp $ */
2 :
3 : /*
4 : * Copyright (c) 2013 David Gwynne <dlg@openbsd.org>
5 : *
6 : * Permission to use, copy, modify, and distribute this software for any
7 : * purpose with or without fee is hereby granted, provided that the above
8 : * copyright notice and this permission notice appear in all copies.
9 : *
10 : * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 : * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 : * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 : * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 : * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 : * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 : * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 : */
18 :
19 : #include <sys/param.h>
20 : #include <sys/systm.h>
21 : #include <sys/malloc.h>
22 : #include <sys/mutex.h>
23 : #include <sys/kthread.h>
24 : #include <sys/task.h>
25 : #include <sys/proc.h>
26 :
27 : #define TASK_ONQUEUE 1
28 :
29 : struct taskq {
30 : enum {
31 : TQ_S_CREATED,
32 : TQ_S_RUNNING,
33 : TQ_S_DESTROYED
34 : } tq_state;
35 : unsigned int tq_running;
36 : unsigned int tq_nthreads;
37 : unsigned int tq_flags;
38 : const char *tq_name;
39 :
40 : struct mutex tq_mtx;
41 : struct task_list tq_worklist;
42 : };
43 :
44 : struct taskq taskq_sys = {
45 : TQ_S_CREATED,
46 : 0,
47 : 1,
48 : 0,
49 : "systq",
50 : MUTEX_INITIALIZER(IPL_HIGH),
51 : TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist)
52 : };
53 :
54 : struct taskq taskq_sys_mp = {
55 : TQ_S_CREATED,
56 : 0,
57 : 1,
58 : TASKQ_MPSAFE,
59 : "systqmp",
60 : MUTEX_INITIALIZER(IPL_HIGH),
61 : TAILQ_HEAD_INITIALIZER(taskq_sys_mp.tq_worklist)
62 : };
63 :
64 : typedef int (*sleepfn)(const volatile void *, struct mutex *, int,
65 : const char *, int);
66 :
67 : struct taskq *const systq = &taskq_sys;
68 : struct taskq *const systqmp = &taskq_sys_mp;
69 :
70 : void taskq_init(void); /* called in init_main.c */
71 : void taskq_create_thread(void *);
72 : void taskq_barrier_task(void *);
73 : int taskq_sleep(const volatile void *, struct mutex *, int,
74 : const char *, int);
75 : int taskq_next_work(struct taskq *, struct task *, sleepfn);
76 : void taskq_thread(void *);
77 :
78 : void
79 0 : taskq_init(void)
80 : {
81 0 : kthread_create_deferred(taskq_create_thread, systq);
82 0 : kthread_create_deferred(taskq_create_thread, systqmp);
83 0 : }
84 :
85 : struct taskq *
86 0 : taskq_create(const char *name, unsigned int nthreads, int ipl,
87 : unsigned int flags)
88 : {
89 : struct taskq *tq;
90 :
91 0 : tq = malloc(sizeof(*tq), M_DEVBUF, M_WAITOK);
92 0 : if (tq == NULL)
93 0 : return (NULL);
94 :
95 0 : tq->tq_state = TQ_S_CREATED;
96 0 : tq->tq_running = 0;
97 0 : tq->tq_nthreads = nthreads;
98 0 : tq->tq_name = name;
99 0 : tq->tq_flags = flags;
100 :
101 0 : mtx_init_flags(&tq->tq_mtx, ipl, name, 0);
102 0 : TAILQ_INIT(&tq->tq_worklist);
103 :
104 : /* try to create a thread to guarantee that tasks will be serviced */
105 0 : kthread_create_deferred(taskq_create_thread, tq);
106 :
107 0 : return (tq);
108 0 : }
109 :
110 : void
111 0 : taskq_destroy(struct taskq *tq)
112 : {
113 0 : mtx_enter(&tq->tq_mtx);
114 0 : switch (tq->tq_state) {
115 : case TQ_S_CREATED:
116 : /* tq is still referenced by taskq_create_thread */
117 0 : tq->tq_state = TQ_S_DESTROYED;
118 0 : mtx_leave(&tq->tq_mtx);
119 0 : return;
120 :
121 : case TQ_S_RUNNING:
122 0 : tq->tq_state = TQ_S_DESTROYED;
123 : break;
124 :
125 : default:
126 0 : panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state);
127 : }
128 :
129 0 : while (tq->tq_running > 0) {
130 0 : wakeup(tq);
131 0 : msleep(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy", 0);
132 : }
133 0 : mtx_leave(&tq->tq_mtx);
134 :
135 0 : free(tq, M_DEVBUF, sizeof(*tq));
136 0 : }
137 :
138 : void
139 0 : taskq_create_thread(void *arg)
140 : {
141 0 : struct taskq *tq = arg;
142 : int rv;
143 :
144 0 : mtx_enter(&tq->tq_mtx);
145 :
146 0 : switch (tq->tq_state) {
147 : case TQ_S_DESTROYED:
148 0 : mtx_leave(&tq->tq_mtx);
149 0 : free(tq, M_DEVBUF, sizeof(*tq));
150 0 : return;
151 :
152 : case TQ_S_CREATED:
153 0 : tq->tq_state = TQ_S_RUNNING;
154 : break;
155 :
156 : default:
157 0 : panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state);
158 : }
159 :
160 0 : do {
161 0 : tq->tq_running++;
162 0 : mtx_leave(&tq->tq_mtx);
163 :
164 0 : rv = kthread_create(taskq_thread, tq, NULL, tq->tq_name);
165 :
166 0 : mtx_enter(&tq->tq_mtx);
167 0 : if (rv != 0) {
168 0 : printf("unable to create thread for \"%s\" taskq\n",
169 0 : tq->tq_name);
170 :
171 0 : tq->tq_running--;
172 : /* could have been destroyed during kthread_create */
173 0 : if (tq->tq_state == TQ_S_DESTROYED &&
174 0 : tq->tq_running == 0)
175 0 : wakeup_one(&tq->tq_running);
176 : break;
177 : }
178 0 : } while (tq->tq_running < tq->tq_nthreads);
179 :
180 0 : mtx_leave(&tq->tq_mtx);
181 0 : }
182 :
183 : void
184 0 : taskq_barrier(struct taskq *tq)
185 : {
186 0 : struct cond c = COND_INITIALIZER();
187 0 : struct task t = TASK_INITIALIZER(taskq_barrier_task, &c);
188 :
189 0 : task_add(tq, &t);
190 :
191 0 : cond_wait(&c, "tqbar");
192 0 : }
193 :
194 : void
195 0 : taskq_barrier_task(void *p)
196 : {
197 0 : struct cond *c = p;
198 0 : cond_signal(c);
199 0 : }
200 :
201 : void
202 0 : task_set(struct task *t, void (*fn)(void *), void *arg)
203 : {
204 0 : t->t_func = fn;
205 0 : t->t_arg = arg;
206 0 : t->t_flags = 0;
207 0 : }
208 :
209 : int
210 0 : task_add(struct taskq *tq, struct task *w)
211 : {
212 : int rv = 0;
213 :
214 0 : if (ISSET(w->t_flags, TASK_ONQUEUE))
215 0 : return (0);
216 :
217 0 : mtx_enter(&tq->tq_mtx);
218 0 : if (!ISSET(w->t_flags, TASK_ONQUEUE)) {
219 : rv = 1;
220 0 : SET(w->t_flags, TASK_ONQUEUE);
221 0 : TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry);
222 0 : }
223 0 : mtx_leave(&tq->tq_mtx);
224 :
225 0 : if (rv)
226 0 : wakeup_one(tq);
227 :
228 0 : return (rv);
229 0 : }
230 :
231 : int
232 0 : task_del(struct taskq *tq, struct task *w)
233 : {
234 : int rv = 0;
235 :
236 0 : if (!ISSET(w->t_flags, TASK_ONQUEUE))
237 0 : return (0);
238 :
239 0 : mtx_enter(&tq->tq_mtx);
240 0 : if (ISSET(w->t_flags, TASK_ONQUEUE)) {
241 : rv = 1;
242 0 : CLR(w->t_flags, TASK_ONQUEUE);
243 0 : TAILQ_REMOVE(&tq->tq_worklist, w, t_entry);
244 0 : }
245 0 : mtx_leave(&tq->tq_mtx);
246 :
247 0 : return (rv);
248 0 : }
249 :
250 : int
251 0 : taskq_sleep(const volatile void *ident, struct mutex *mtx, int priority,
252 : const char *wmesg, int tmo)
253 : {
254 0 : u_int *flags = &curproc->p_flag;
255 : int rv;
256 :
257 0 : atomic_clearbits_int(flags, P_CANTSLEEP);
258 0 : rv = msleep(ident, mtx, priority, wmesg, tmo);
259 0 : atomic_setbits_int(flags, P_CANTSLEEP);
260 :
261 0 : return (tmo);
262 : }
263 :
264 : int
265 0 : taskq_next_work(struct taskq *tq, struct task *work, sleepfn tqsleep)
266 : {
267 : struct task *next;
268 :
269 0 : mtx_enter(&tq->tq_mtx);
270 0 : while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) {
271 0 : if (tq->tq_state != TQ_S_RUNNING) {
272 0 : mtx_leave(&tq->tq_mtx);
273 0 : return (0);
274 : }
275 :
276 0 : tqsleep(tq, &tq->tq_mtx, PWAIT, "bored", 0);
277 : }
278 :
279 0 : TAILQ_REMOVE(&tq->tq_worklist, next, t_entry);
280 0 : CLR(next->t_flags, TASK_ONQUEUE);
281 :
282 0 : *work = *next; /* copy to caller to avoid races */
283 :
284 0 : next = TAILQ_FIRST(&tq->tq_worklist);
285 0 : mtx_leave(&tq->tq_mtx);
286 :
287 0 : if (next != NULL && tq->tq_nthreads > 1)
288 0 : wakeup_one(tq);
289 :
290 0 : return (1);
291 0 : }
292 :
293 : void
294 0 : taskq_thread(void *xtq)
295 : {
296 : sleepfn tqsleep = msleep;
297 0 : struct taskq *tq = xtq;
298 0 : struct task work;
299 : int last;
300 :
301 0 : if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
302 0 : KERNEL_UNLOCK();
303 :
304 0 : if (ISSET(tq->tq_flags, TASKQ_CANTSLEEP)) {
305 : tqsleep = taskq_sleep;
306 0 : atomic_setbits_int(&curproc->p_flag, P_CANTSLEEP);
307 0 : }
308 :
309 0 : while (taskq_next_work(tq, &work, tqsleep)) {
310 0 : (*work.t_func)(work.t_arg);
311 0 : sched_pause(yield);
312 : }
313 :
314 0 : mtx_enter(&tq->tq_mtx);
315 0 : last = (--tq->tq_running == 0);
316 0 : mtx_leave(&tq->tq_mtx);
317 :
318 0 : if (ISSET(tq->tq_flags, TASKQ_CANTSLEEP))
319 0 : atomic_clearbits_int(&curproc->p_flag, P_CANTSLEEP);
320 :
321 0 : if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
322 0 : KERNEL_LOCK();
323 :
324 0 : if (last)
325 0 : wakeup_one(&tq->tq_running);
326 :
327 0 : kthread_exit(0);
328 : }
|