Line | Exclusive | Inclusive | Code |
---|---|---|---|
1 | # This file is a part of Julia. License is MIT: https://julialang.org/license | ||
2 | |||
3 | ## condition variables | ||
4 | |||
5 | """ | ||
6 | Condition() | ||
7 | |||
8 | Create an edge-triggered event source that tasks can wait for. Tasks that call [`wait`](@ref) on a | ||
9 | `Condition` are suspended and queued. Tasks are woken up when [`notify`](@ref) is later called on | ||
10 | the `Condition`. Edge triggering means that only tasks waiting at the time [`notify`](@ref) is | ||
11 | called can be woken up. For level-triggered notifications, you must keep extra state to keep | ||
12 | track of whether a notification has happened. The [`Channel`](@ref) type does | ||
13 | this, and so can be used for level-triggered events. | ||
14 | """ | ||
15 | mutable struct Condition | ||
16 | waitq::Vector{Any} | ||
17 | |||
18 | Condition() = new([]) | ||
19 | end | ||
20 | |||
21 | """ | ||
22 | wait([x]) | ||
23 | |||
24 | Block the current task until some event occurs, depending on the type of the argument: | ||
25 | |||
26 | * [`Channel`](@ref): Wait for a value to be appended to the channel. | ||
27 | * [`Condition`](@ref): Wait for [`notify`](@ref) on a condition. | ||
28 | * `Process`: Wait for a process or process chain to exit. The `exitcode` field of a process | ||
29 | can be used to determine success or failure. | ||
30 | * [`Task`](@ref): Wait for a `Task` to finish. If the task fails with an exception, the | ||
31 | exception is propagated (re-thrown in the task that called `wait`). | ||
32 | * [`RawFD`](@ref): Wait for changes on a file descriptor (see the `FileWatching` package). | ||
33 | |||
34 | If no argument is passed, the task blocks for an undefined period. A task can only be | ||
35 | restarted by an explicit call to [`schedule`](@ref) or [`yieldto`](@ref). | ||
36 | |||
37 | Often `wait` is called within a `while` loop to ensure a waited-for condition is met before | ||
38 | proceeding. | ||
39 | """ | ||
40 | function wait(c::Condition) | ||
41 | ct = current_task() | ||
42 | |||
43 | push!(c.waitq, ct) | ||
44 | |||
45 | try | ||
46 | return wait() | ||
47 | catch | ||
48 | filter!(x->x!==ct, c.waitq) | ||
49 | rethrow() | ||
50 | end | ||
51 | end | ||
52 | |||
53 | """ | ||
54 | notify(condition, val=nothing; all=true, error=false) | ||
55 | |||
56 | Wake up tasks waiting for a condition, passing them `val`. If `all` is `true` (the default), | ||
57 | all waiting tasks are woken, otherwise only one is. If `error` is `true`, the passed value | ||
58 | is raised as an exception in the woken tasks. | ||
59 | |||
60 | Return the count of tasks woken up. Return 0 if no tasks are waiting on `condition`. | ||
61 | """ | ||
62 | notify(c::Condition, @nospecialize(arg = nothing); all=true, error=false) = notify(c, arg, all, error) | ||
63 | function notify(c::Condition, arg, all, error) | ||
64 | cnt = 0 | ||
65 | if all | ||
66 | cnt = length(c.waitq) | ||
67 | for t in c.waitq | ||
68 | error ? schedule(t, arg, error=error) : schedule(t, arg) | ||
69 | end | ||
70 | empty!(c.waitq) | ||
71 | elseif !isempty(c.waitq) | ||
72 | cnt = 1 | ||
73 | t = popfirst!(c.waitq) | ||
74 | error ? schedule(t, arg, error=error) : schedule(t, arg) | ||
75 | end | ||
76 | cnt | ||
77 | end | ||
78 | |||
79 | notify_error(c::Condition, err) = notify(c, err, true, true) | ||
80 | |||
81 | n_waiters(c::Condition) = length(c.waitq) | ||
82 | |||
83 | ## scheduler and work queue | ||
84 | |||
85 | global const Workqueue = Task[] | ||
86 | |||
87 | function enq_work(t::Task) | ||
88 | t.state == :runnable || error("schedule: Task not runnable") | ||
89 | ccall(:uv_stop, Cvoid, (Ptr{Cvoid},), eventloop()) | ||
90 | push!(Workqueue, t) | ||
91 | t.state = :queued | ||
92 | return t | ||
93 | end | ||
94 | |||
95 | schedule(t::Task) = enq_work(t) | ||
96 | |||
97 | """ | ||
98 | schedule(t::Task, [val]; error=false) | ||
99 | |||
100 | Add a [`Task`](@ref) to the scheduler's queue. This causes the task to run constantly when the system | ||
101 | is otherwise idle, unless the task performs a blocking operation such as [`wait`](@ref). | ||
102 | |||
103 | If a second argument `val` is provided, it will be passed to the task (via the return value of | ||
104 | [`yieldto`](@ref)) when it runs again. If `error` is `true`, the value is raised as an exception in | ||
105 | the woken task. | ||
106 | |||
107 | # Examples | ||
108 | ```jldoctest | ||
109 | julia> a5() = sum(i for i in 1:1000); | ||
110 | |||
111 | julia> b = Task(a5); | ||
112 | |||
113 | julia> istaskstarted(b) | ||
114 | false | ||
115 | |||
116 | julia> schedule(b); | ||
117 | |||
118 | julia> yield(); | ||
119 | |||
120 | julia> istaskstarted(b) | ||
121 | true | ||
122 | |||
123 | julia> istaskdone(b) | ||
124 | true | ||
125 | ``` | ||
126 | """ | ||
127 | function schedule(t::Task, arg; error=false) | ||
128 | # schedule a task to be (re)started with the given value or exception | ||
129 | if error | ||
130 | t.exception = arg | ||
131 | else | ||
132 | t.result = arg | ||
133 | end | ||
134 | return enq_work(t) | ||
135 | end | ||
136 | |||
137 | # fast version of `schedule(t, arg); wait()` | ||
138 | function schedule_and_wait(t::Task, arg=nothing) | ||
139 | t.state == :runnable || error("schedule: Task not runnable") | ||
140 | if isempty(Workqueue) | ||
141 | return yieldto(t, arg) | ||
142 | else | ||
143 | t.result = arg | ||
144 | push!(Workqueue, t) | ||
145 | t.state = :queued | ||
146 | end | ||
147 | return wait() | ||
148 | end | ||
149 | |||
150 | """ | ||
151 | yield() | ||
152 | |||
153 | Switch to the scheduler to allow another scheduled task to run. A task that calls this | ||
154 | function is still runnable, and will be restarted immediately if there are no other runnable | ||
155 | tasks. | ||
156 | """ | ||
157 | yield() = (enq_work(current_task()); wait()) | ||
158 | |||
159 | """ | ||
160 | yield(t::Task, arg = nothing) | ||
161 | |||
162 | A fast, unfair-scheduling version of `schedule(t, arg); yield()` which | ||
163 | immediately yields to `t` before calling the scheduler. | ||
164 | """ | ||
165 | function yield(t::Task, @nospecialize x = nothing) | ||
166 | t.state == :runnable || error("schedule: Task not runnable") | ||
167 | t.result = x | ||
168 | enq_work(current_task()) | ||
169 | return try_yieldto(ensure_rescheduled, Ref(t)) | ||
170 | end | ||
171 | |||
172 | """ | ||
173 | yieldto(t::Task, arg = nothing) | ||
174 | |||
175 | Switch to the given task. The first time a task is switched to, the task's function is | ||
176 | called with no arguments. On subsequent switches, `arg` is returned from the task's last | ||
177 | call to `yieldto`. This is a low-level call that only switches tasks, not considering states | ||
178 | or scheduling in any way. Its use is discouraged. | ||
179 | """ | ||
180 | function yieldto(t::Task, @nospecialize x = nothing) | ||
181 | t.result = x | ||
182 | return try_yieldto(identity, Ref(t)) | ||
183 | end | ||
184 | |||
185 | function try_yieldto(undo, reftask::Ref{Task}) | ||
186 | try | ||
187 | ccall(:jl_switchto, Cvoid, (Any,), reftask) | ||
188 | catch e | ||
189 | undo(reftask[]) | ||
190 | rethrow(e) | ||
191 | end | ||
192 | ct = current_task() | ||
193 | exc = ct.exception | ||
194 | if exc !== nothing | ||
195 | ct.exception = nothing | ||
196 | throw(exc) | ||
197 | end | ||
198 | result = ct.result | ||
199 | ct.result = nothing | ||
200 | return result | ||
201 | end | ||
202 | |||
203 | # yield to a task, throwing an exception in it | ||
204 | function throwto(t::Task, @nospecialize exc) | ||
205 | t.exception = exc | ||
206 | return yieldto(t) | ||
207 | end | ||
208 | |||
209 | function ensure_rescheduled(othertask::Task) | ||
210 | ct = current_task() | ||
211 | if ct !== othertask && othertask.state == :runnable | ||
212 | # we failed to yield to othertask | ||
213 | # return it to the head of the queue to be scheduled later | ||
214 | pushfirst!(Workqueue, othertask) | ||
215 | othertask.state = :queued | ||
216 | end | ||
217 | if ct.state == :queued | ||
218 | # if the current task was queued, | ||
219 | # also need to return it to the runnable state | ||
220 | # before throwing an error | ||
221 | i = findfirst(t->t===ct, Workqueue) | ||
222 | i === nothing || deleteat!(Workqueue, i) | ||
223 | ct.state = :runnable | ||
224 | end | ||
225 | nothing | ||
226 | end | ||
227 | |||
228 | @noinline function poptask() | ||
229 | t = popfirst!(Workqueue) | ||
230 | if t.state != :queued | ||
231 | # assume this somehow got queued twice, | ||
232 | # probably broken now, but try discarding this switch and keep going | ||
233 | # can't throw here, because it's probably not the fault of the caller to wait | ||
234 | # and don't want to use print() here, because that may try to incur a task switch | ||
235 | ccall(:jl_safe_printf, Cvoid, (Ptr{UInt8}, Int32...), | ||
236 | "\nWARNING: Workqueue inconsistency detected: popfirst!(Workqueue).state != :queued\n") | ||
237 | return | ||
238 | end | ||
239 | t.state = :runnable | ||
240 | return Ref(t) | ||
241 | end | ||
242 | |||
243 | function wait() | ||
244 | while true | ||
245 | if isempty(Workqueue) | ||
246 | 3 (7.14%) |
3 (100.00%)
samples spent calling
process_events
c = process_events(true)
|
|
247 | if c == 0 && eventloop() != C_NULL && isempty(Workqueue) | ||
248 | # if there are no active handles and no runnable tasks, just | ||
249 | # wait for signals. | ||
250 | pause() | ||
251 | end | ||
252 | else | ||
253 | reftask = poptask() | ||
254 | if reftask !== nothing | ||
255 | result = try_yieldto(ensure_rescheduled, reftask) | ||
256 | process_events(false) | ||
257 | # return when we come out of the queue | ||
258 | return result | ||
259 | end | ||
260 | end | ||
261 | end | ||
262 | # unreachable | ||
263 | end | ||
264 | |||
265 | if Sys.iswindows() | ||
266 | pause() = ccall(:Sleep, stdcall, Cvoid, (UInt32,), 0xffffffff) | ||
267 | else | ||
268 | pause() = ccall(:pause, Cvoid, ()) | ||
269 | end | ||
270 | |||
271 | |||
272 | ## async event notifications | ||
273 | |||
274 | """ | ||
275 | AsyncCondition() | ||
276 | |||
277 | Create a async condition that wakes up tasks waiting for it | ||
278 | (by calling [`wait`](@ref) on the object) | ||
279 | when notified from C by a call to `uv_async_send`. | ||
280 | Waiting tasks are woken with an error when the object is closed (by [`close`](@ref). | ||
281 | Use [`isopen`](@ref) to check whether it is still active. | ||
282 | """ | ||
283 | mutable struct AsyncCondition | ||
284 | handle::Ptr{Cvoid} | ||
285 | cond::Condition | ||
286 | isopen::Bool | ||
287 | |||
288 | function AsyncCondition() | ||
289 | this = new(Libc.malloc(_sizeof_uv_async), Condition(), true) | ||
290 | associate_julia_struct(this.handle, this) | ||
291 | finalizer(uvfinalize, this) | ||
292 | err = ccall(:uv_async_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}), | ||
293 | eventloop(), this, uv_jl_asynccb::Ptr{Cvoid}) | ||
294 | if err != 0 | ||
295 | #TODO: this codepath is currently not tested | ||
296 | Libc.free(this.handle) | ||
297 | this.handle = C_NULL | ||
298 | throw(_UVError("uv_async_init", err)) | ||
299 | end | ||
300 | return this | ||
301 | end | ||
302 | end | ||
303 | |||
304 | """ | ||
305 | AsyncCondition(callback::Function) | ||
306 | |||
307 | Create a async condition that calls the given `callback` function. The `callback` is passed one argument, | ||
308 | the async condition object itself. | ||
309 | """ | ||
310 | function AsyncCondition(cb::Function) | ||
311 | async = AsyncCondition() | ||
312 | waiter = Task(function() | ||
313 | while isopen(async) | ||
314 | success = try | ||
315 | wait(async) | ||
316 | true | ||
317 | catch exc # ignore possible exception on close() | ||
318 | isa(exc, EOFError) || rethrow(exc) | ||
319 | end | ||
320 | success && cb(async) | ||
321 | end | ||
322 | end) | ||
323 | # must start the task right away so that it can wait for the AsyncCondition before | ||
324 | # we re-enter the event loop. this avoids a race condition. see issue #12719 | ||
325 | yield(waiter) | ||
326 | return async | ||
327 | end | ||
328 | |||
329 | ## timer-based notifications | ||
330 | |||
331 | """ | ||
332 | Timer(delay; interval = 0) | ||
333 | |||
334 | Create a timer that wakes up tasks waiting for it (by calling [`wait`](@ref) on the timer object). | ||
335 | |||
336 | Waiting tasks are woken after an initial delay of `delay` seconds, and then repeating with the given | ||
337 | `interval` in seconds. If `interval` is equal to `0`, the timer is only triggered once. When | ||
338 | the timer is closed (by [`close`](@ref) waiting tasks are woken with an error. Use [`isopen`](@ref) | ||
339 | to check whether a timer is still active. | ||
340 | """ | ||
341 | mutable struct Timer | ||
342 | handle::Ptr{Cvoid} | ||
343 | cond::Condition | ||
344 | isopen::Bool | ||
345 | |||
346 | function Timer(timeout::Real; interval::Real = 0.0) | ||
347 | timeout ≥ 0 || throw(ArgumentError("timer cannot have negative timeout of $timeout seconds")) | ||
348 | interval ≥ 0 || throw(ArgumentError("timer cannot have negative repeat interval of $interval seconds")) | ||
349 | |||
350 | this = new(Libc.malloc(_sizeof_uv_timer), Condition(), true) | ||
351 | err = ccall(:uv_timer_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}), eventloop(), this) | ||
352 | if err != 0 | ||
353 | #TODO: this codepath is currently not tested | ||
354 | Libc.free(this.handle) | ||
355 | this.handle = C_NULL | ||
356 | throw(_UVError("uv_timer_init", err)) | ||
357 | end | ||
358 | |||
359 | associate_julia_struct(this.handle, this) | ||
360 | finalizer(uvfinalize, this) | ||
361 | |||
362 | ccall(:uv_update_time, Cvoid, (Ptr{Cvoid},), eventloop()) | ||
363 | ccall(:uv_timer_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, UInt64, UInt64), | ||
364 | this, uv_jl_timercb::Ptr{Cvoid}, | ||
365 | UInt64(round(timeout * 1000)) + 1, UInt64(round(interval * 1000))) | ||
366 | return this | ||
367 | end | ||
368 | end | ||
369 | |||
370 | unsafe_convert(::Type{Ptr{Cvoid}}, t::Timer) = t.handle | ||
371 | unsafe_convert(::Type{Ptr{Cvoid}}, async::AsyncCondition) = async.handle | ||
372 | |||
373 | function wait(t::Union{Timer, AsyncCondition}) | ||
374 | isopen(t) || throw(EOFError()) | ||
375 | stream_wait(t, t.cond) | ||
376 | end | ||
377 | |||
378 | isopen(t::Union{Timer, AsyncCondition}) = t.isopen | ||
379 | |||
380 | function close(t::Union{Timer, AsyncCondition}) | ||
381 | if t.handle != C_NULL && isopen(t) | ||
382 | t.isopen = false | ||
383 | isa(t, Timer) && ccall(:uv_timer_stop, Cint, (Ptr{Cvoid},), t) | ||
384 | ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t) | ||
385 | end | ||
386 | nothing | ||
387 | end | ||
388 | |||
389 | function uvfinalize(t::Union{Timer, AsyncCondition}) | ||
390 | if t.handle != C_NULL | ||
391 | disassociate_julia_struct(t.handle) # not going to call the usual close hooks | ||
392 | close(t) | ||
393 | t.handle = C_NULL | ||
394 | end | ||
395 | t.isopen = false | ||
396 | nothing | ||
397 | end | ||
398 | |||
399 | function _uv_hook_close(t::Union{Timer, AsyncCondition}) | ||
400 | uvfinalize(t) | ||
401 | notify_error(t.cond, EOFError()) | ||
402 | nothing | ||
403 | end | ||
404 | |||
405 | function uv_asynccb(handle::Ptr{Cvoid}) | ||
406 | async = @handle_as handle AsyncCondition | ||
407 | notify(async.cond) | ||
408 | nothing | ||
409 | end | ||
410 | |||
411 | function uv_timercb(handle::Ptr{Cvoid}) | ||
412 | t = @handle_as handle Timer | ||
413 | if ccall(:uv_timer_get_repeat, UInt64, (Ptr{Cvoid},), t) == 0 | ||
414 | # timer is stopped now | ||
415 | close(t) | ||
416 | end | ||
417 | notify(t.cond) | ||
418 | nothing | ||
419 | end | ||
420 | |||
421 | """ | ||
422 | sleep(seconds) | ||
423 | |||
424 | Block the current task for a specified number of seconds. The minimum sleep time is 1 | ||
425 | millisecond or input of `0.001`. | ||
426 | """ | ||
427 | function sleep(sec::Real) | ||
428 | sec ≥ 0 || throw(ArgumentError("cannot sleep for $sec seconds")) | ||
429 | wait(Timer(sec)) | ||
430 | nothing | ||
431 | end | ||
432 | |||
433 | # timer with repeated callback | ||
434 | """ | ||
435 | Timer(callback::Function, delay; interval = 0) | ||
436 | |||
437 | Create a timer that wakes up tasks waiting for it (by calling [`wait`](@ref) on the timer object) and | ||
438 | calls the function `callback`. | ||
439 | |||
440 | Waiting tasks are woken and the function `callback` is called after an initial delay of `delay` seconds, | ||
441 | and then repeating with the given `interval` in seconds. If `interval` is equal to `0`, the timer | ||
442 | is only triggered once. The function `callback` is called with a single argument, the timer itself. | ||
443 | When the timer is closed (by [`close`](@ref) waiting tasks are woken with an error. Use [`isopen`](@ref) | ||
444 | to check whether a timer is still active. | ||
445 | |||
446 | # Examples | ||
447 | |||
448 | Here the first number is printed after a delay of two seconds, then the following numbers are printed quickly. | ||
449 | |||
450 | ```julia-repl | ||
451 | julia> begin | ||
452 | i = 0 | ||
453 | cb(timer) = (global i += 1; println(i)) | ||
454 | t = Timer(cb, 2, interval = 0.2) | ||
455 | wait(t) | ||
456 | sleep(0.5) | ||
457 | close(t) | ||
458 | end | ||
459 | 1 | ||
460 | 2 | ||
461 | 3 | ||
462 | ``` | ||
463 | """ | ||
464 | function Timer(cb::Function, timeout::Real; interval::Real = 0.0) | ||
465 | t = Timer(timeout, interval = interval) | ||
466 | waiter = Task(function() | ||
467 | while isopen(t) | ||
468 | success = try | ||
469 | wait(t) | ||
470 | true | ||
471 | catch exc # ignore possible exception on close() | ||
472 | isa(exc, EOFError) || rethrow(exc) | ||
473 | false | ||
474 | end | ||
475 | success && cb(t) | ||
476 | end | ||
477 | end) | ||
478 | # must start the task right away so that it can wait for the Timer before | ||
479 | # we re-enter the event loop. this avoids a race condition. see issue #12719 | ||
480 | yield(waiter) | ||
481 | return t | ||
482 | end |