StatProfilerHTML.jl report
Generated on tor 10 okt 2019 11:38:33
File source code
Line Exclusive Inclusive Code
1 # This file is a part of Julia. License is MIT: https://julialang.org/license
2
3 ## condition variables
4
5 """
6 Condition()
7
8 Create an edge-triggered event source that tasks can wait for. Tasks that call [`wait`](@ref) on a
9 `Condition` are suspended and queued. Tasks are woken up when [`notify`](@ref) is later called on
10 the `Condition`. Edge triggering means that only tasks waiting at the time [`notify`](@ref) is
11 called can be woken up. For level-triggered notifications, you must keep extra state to keep
12 track of whether a notification has happened. The [`Channel`](@ref) type does
13 this, and so can be used for level-triggered events.
14 """
15 mutable struct Condition
16 waitq::Vector{Any}
17
18 Condition() = new([])
19 end
20
21 """
22 wait([x])
23
24 Block the current task until some event occurs, depending on the type of the argument:
25
26 * [`Channel`](@ref): Wait for a value to be appended to the channel.
27 * [`Condition`](@ref): Wait for [`notify`](@ref) on a condition.
28 * `Process`: Wait for a process or process chain to exit. The `exitcode` field of a process
29 can be used to determine success or failure.
30 * [`Task`](@ref): Wait for a `Task` to finish. If the task fails with an exception, the
31 exception is propagated (re-thrown in the task that called `wait`).
32 * [`RawFD`](@ref): Wait for changes on a file descriptor (see the `FileWatching` package).
33
34 If no argument is passed, the task blocks for an undefined period. A task can only be
35 restarted by an explicit call to [`schedule`](@ref) or [`yieldto`](@ref).
36
37 Often `wait` is called within a `while` loop to ensure a waited-for condition is met before
38 proceeding.
39 """
40 function wait(c::Condition)
41 ct = current_task()
42
43 push!(c.waitq, ct)
44
45 try
46 return wait()
47 catch
48 filter!(x->x!==ct, c.waitq)
49 rethrow()
50 end
51 end
52
53 """
54 notify(condition, val=nothing; all=true, error=false)
55
56 Wake up tasks waiting for a condition, passing them `val`. If `all` is `true` (the default),
57 all waiting tasks are woken, otherwise only one is. If `error` is `true`, the passed value
58 is raised as an exception in the woken tasks.
59
60 Return the count of tasks woken up. Return 0 if no tasks are waiting on `condition`.
61 """
62 notify(c::Condition, @nospecialize(arg = nothing); all=true, error=false) = notify(c, arg, all, error)
63 function notify(c::Condition, arg, all, error)
64 cnt = 0
65 if all
66 cnt = length(c.waitq)
67 for t in c.waitq
68 error ? schedule(t, arg, error=error) : schedule(t, arg)
69 end
70 empty!(c.waitq)
71 elseif !isempty(c.waitq)
72 cnt = 1
73 t = popfirst!(c.waitq)
74 error ? schedule(t, arg, error=error) : schedule(t, arg)
75 end
76 cnt
77 end
78
79 notify_error(c::Condition, err) = notify(c, err, true, true)
80
81 n_waiters(c::Condition) = length(c.waitq)
82
83 ## scheduler and work queue
84
85 global const Workqueue = Task[]
86
87 function enq_work(t::Task)
88 t.state == :runnable || error("schedule: Task not runnable")
89 ccall(:uv_stop, Cvoid, (Ptr{Cvoid},), eventloop())
90 push!(Workqueue, t)
91 t.state = :queued
92 return t
93 end
94
95 schedule(t::Task) = enq_work(t)
96
97 """
98 schedule(t::Task, [val]; error=false)
99
100 Add a [`Task`](@ref) to the scheduler's queue. This causes the task to run constantly when the system
101 is otherwise idle, unless the task performs a blocking operation such as [`wait`](@ref).
102
103 If a second argument `val` is provided, it will be passed to the task (via the return value of
104 [`yieldto`](@ref)) when it runs again. If `error` is `true`, the value is raised as an exception in
105 the woken task.
106
107 # Examples
108 ```jldoctest
109 julia> a5() = sum(i for i in 1:1000);
110
111 julia> b = Task(a5);
112
113 julia> istaskstarted(b)
114 false
115
116 julia> schedule(b);
117
118 julia> yield();
119
120 julia> istaskstarted(b)
121 true
122
123 julia> istaskdone(b)
124 true
125 ```
126 """
127 function schedule(t::Task, arg; error=false)
128 # schedule a task to be (re)started with the given value or exception
129 if error
130 t.exception = arg
131 else
132 t.result = arg
133 end
134 return enq_work(t)
135 end
136
137 # fast version of `schedule(t, arg); wait()`
138 function schedule_and_wait(t::Task, arg=nothing)
139 t.state == :runnable || error("schedule: Task not runnable")
140 if isempty(Workqueue)
141 return yieldto(t, arg)
142 else
143 t.result = arg
144 push!(Workqueue, t)
145 t.state = :queued
146 end
147 return wait()
148 end
149
150 """
151 yield()
152
153 Switch to the scheduler to allow another scheduled task to run. A task that calls this
154 function is still runnable, and will be restarted immediately if there are no other runnable
155 tasks.
156 """
157 yield() = (enq_work(current_task()); wait())
158
159 """
160 yield(t::Task, arg = nothing)
161
162 A fast, unfair-scheduling version of `schedule(t, arg); yield()` which
163 immediately yields to `t` before calling the scheduler.
164 """
165 function yield(t::Task, @nospecialize x = nothing)
166 t.state == :runnable || error("schedule: Task not runnable")
167 t.result = x
168 enq_work(current_task())
169 return try_yieldto(ensure_rescheduled, Ref(t))
170 end
171
172 """
173 yieldto(t::Task, arg = nothing)
174
175 Switch to the given task. The first time a task is switched to, the task's function is
176 called with no arguments. On subsequent switches, `arg` is returned from the task's last
177 call to `yieldto`. This is a low-level call that only switches tasks, not considering states
178 or scheduling in any way. Its use is discouraged.
179 """
180 function yieldto(t::Task, @nospecialize x = nothing)
181 t.result = x
182 return try_yieldto(identity, Ref(t))
183 end
184
185 function try_yieldto(undo, reftask::Ref{Task})
186 try
187 ccall(:jl_switchto, Cvoid, (Any,), reftask)
188 catch e
189 undo(reftask[])
190 rethrow(e)
191 end
192 ct = current_task()
193 exc = ct.exception
194 if exc !== nothing
195 ct.exception = nothing
196 throw(exc)
197 end
198 result = ct.result
199 ct.result = nothing
200 return result
201 end
202
203 # yield to a task, throwing an exception in it
204 function throwto(t::Task, @nospecialize exc)
205 t.exception = exc
206 return yieldto(t)
207 end
208
209 function ensure_rescheduled(othertask::Task)
210 ct = current_task()
211 if ct !== othertask && othertask.state == :runnable
212 # we failed to yield to othertask
213 # return it to the head of the queue to be scheduled later
214 pushfirst!(Workqueue, othertask)
215 othertask.state = :queued
216 end
217 if ct.state == :queued
218 # if the current task was queued,
219 # also need to return it to the runnable state
220 # before throwing an error
221 i = findfirst(t->t===ct, Workqueue)
222 i === nothing || deleteat!(Workqueue, i)
223 ct.state = :runnable
224 end
225 nothing
226 end
227
228 @noinline function poptask()
229 t = popfirst!(Workqueue)
230 if t.state != :queued
231 # assume this somehow got queued twice,
232 # probably broken now, but try discarding this switch and keep going
233 # can't throw here, because it's probably not the fault of the caller to wait
234 # and don't want to use print() here, because that may try to incur a task switch
235 ccall(:jl_safe_printf, Cvoid, (Ptr{UInt8}, Int32...),
236 "\nWARNING: Workqueue inconsistency detected: popfirst!(Workqueue).state != :queued\n")
237 return
238 end
239 t.state = :runnable
240 return Ref(t)
241 end
242
243
3 (7.14%) samples spent in wait
0 (ex.), 3 (100.00%) (incl.) when called from uv_write line 782
function wait()
244 while true
245 if isempty(Workqueue)
246 3 (7.14%)
3 (100.00%) samples spent calling process_events
c = process_events(true)
247 if c == 0 && eventloop() != C_NULL && isempty(Workqueue)
248 # if there are no active handles and no runnable tasks, just
249 # wait for signals.
250 pause()
251 end
252 else
253 reftask = poptask()
254 if reftask !== nothing
255 result = try_yieldto(ensure_rescheduled, reftask)
256 process_events(false)
257 # return when we come out of the queue
258 return result
259 end
260 end
261 end
262 # unreachable
263 end
264
265 if Sys.iswindows()
266 pause() = ccall(:Sleep, stdcall, Cvoid, (UInt32,), 0xffffffff)
267 else
268 pause() = ccall(:pause, Cvoid, ())
269 end
270
271
272 ## async event notifications
273
274 """
275 AsyncCondition()
276
277 Create a async condition that wakes up tasks waiting for it
278 (by calling [`wait`](@ref) on the object)
279 when notified from C by a call to `uv_async_send`.
280 Waiting tasks are woken with an error when the object is closed (by [`close`](@ref).
281 Use [`isopen`](@ref) to check whether it is still active.
282 """
283 mutable struct AsyncCondition
284 handle::Ptr{Cvoid}
285 cond::Condition
286 isopen::Bool
287
288 function AsyncCondition()
289 this = new(Libc.malloc(_sizeof_uv_async), Condition(), true)
290 associate_julia_struct(this.handle, this)
291 finalizer(uvfinalize, this)
292 err = ccall(:uv_async_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}),
293 eventloop(), this, uv_jl_asynccb::Ptr{Cvoid})
294 if err != 0
295 #TODO: this codepath is currently not tested
296 Libc.free(this.handle)
297 this.handle = C_NULL
298 throw(_UVError("uv_async_init", err))
299 end
300 return this
301 end
302 end
303
304 """
305 AsyncCondition(callback::Function)
306
307 Create a async condition that calls the given `callback` function. The `callback` is passed one argument,
308 the async condition object itself.
309 """
310 function AsyncCondition(cb::Function)
311 async = AsyncCondition()
312 waiter = Task(function()
313 while isopen(async)
314 success = try
315 wait(async)
316 true
317 catch exc # ignore possible exception on close()
318 isa(exc, EOFError) || rethrow(exc)
319 end
320 success && cb(async)
321 end
322 end)
323 # must start the task right away so that it can wait for the AsyncCondition before
324 # we re-enter the event loop. this avoids a race condition. see issue #12719
325 yield(waiter)
326 return async
327 end
328
329 ## timer-based notifications
330
331 """
332 Timer(delay; interval = 0)
333
334 Create a timer that wakes up tasks waiting for it (by calling [`wait`](@ref) on the timer object).
335
336 Waiting tasks are woken after an initial delay of `delay` seconds, and then repeating with the given
337 `interval` in seconds. If `interval` is equal to `0`, the timer is only triggered once. When
338 the timer is closed (by [`close`](@ref) waiting tasks are woken with an error. Use [`isopen`](@ref)
339 to check whether a timer is still active.
340 """
341 mutable struct Timer
342 handle::Ptr{Cvoid}
343 cond::Condition
344 isopen::Bool
345
346 function Timer(timeout::Real; interval::Real = 0.0)
347 timeout ≥ 0 || throw(ArgumentError("timer cannot have negative timeout of $timeout seconds"))
348 interval ≥ 0 || throw(ArgumentError("timer cannot have negative repeat interval of $interval seconds"))
349
350 this = new(Libc.malloc(_sizeof_uv_timer), Condition(), true)
351 err = ccall(:uv_timer_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}), eventloop(), this)
352 if err != 0
353 #TODO: this codepath is currently not tested
354 Libc.free(this.handle)
355 this.handle = C_NULL
356 throw(_UVError("uv_timer_init", err))
357 end
358
359 associate_julia_struct(this.handle, this)
360 finalizer(uvfinalize, this)
361
362 ccall(:uv_update_time, Cvoid, (Ptr{Cvoid},), eventloop())
363 ccall(:uv_timer_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, UInt64, UInt64),
364 this, uv_jl_timercb::Ptr{Cvoid},
365 UInt64(round(timeout * 1000)) + 1, UInt64(round(interval * 1000)))
366 return this
367 end
368 end
369
370 unsafe_convert(::Type{Ptr{Cvoid}}, t::Timer) = t.handle
371 unsafe_convert(::Type{Ptr{Cvoid}}, async::AsyncCondition) = async.handle
372
373 function wait(t::Union{Timer, AsyncCondition})
374 isopen(t) || throw(EOFError())
375 stream_wait(t, t.cond)
376 end
377
378 isopen(t::Union{Timer, AsyncCondition}) = t.isopen
379
380 function close(t::Union{Timer, AsyncCondition})
381 if t.handle != C_NULL && isopen(t)
382 t.isopen = false
383 isa(t, Timer) && ccall(:uv_timer_stop, Cint, (Ptr{Cvoid},), t)
384 ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t)
385 end
386 nothing
387 end
388
389 function uvfinalize(t::Union{Timer, AsyncCondition})
390 if t.handle != C_NULL
391 disassociate_julia_struct(t.handle) # not going to call the usual close hooks
392 close(t)
393 t.handle = C_NULL
394 end
395 t.isopen = false
396 nothing
397 end
398
399 function _uv_hook_close(t::Union{Timer, AsyncCondition})
400 uvfinalize(t)
401 notify_error(t.cond, EOFError())
402 nothing
403 end
404
405 function uv_asynccb(handle::Ptr{Cvoid})
406 async = @handle_as handle AsyncCondition
407 notify(async.cond)
408 nothing
409 end
410
411 function uv_timercb(handle::Ptr{Cvoid})
412 t = @handle_as handle Timer
413 if ccall(:uv_timer_get_repeat, UInt64, (Ptr{Cvoid},), t) == 0
414 # timer is stopped now
415 close(t)
416 end
417 notify(t.cond)
418 nothing
419 end
420
421 """
422 sleep(seconds)
423
424 Block the current task for a specified number of seconds. The minimum sleep time is 1
425 millisecond or input of `0.001`.
426 """
427 function sleep(sec::Real)
428 sec ≥ 0 || throw(ArgumentError("cannot sleep for $sec seconds"))
429 wait(Timer(sec))
430 nothing
431 end
432
433 # timer with repeated callback
434 """
435 Timer(callback::Function, delay; interval = 0)
436
437 Create a timer that wakes up tasks waiting for it (by calling [`wait`](@ref) on the timer object) and
438 calls the function `callback`.
439
440 Waiting tasks are woken and the function `callback` is called after an initial delay of `delay` seconds,
441 and then repeating with the given `interval` in seconds. If `interval` is equal to `0`, the timer
442 is only triggered once. The function `callback` is called with a single argument, the timer itself.
443 When the timer is closed (by [`close`](@ref) waiting tasks are woken with an error. Use [`isopen`](@ref)
444 to check whether a timer is still active.
445
446 # Examples
447
448 Here the first number is printed after a delay of two seconds, then the following numbers are printed quickly.
449
450 ```julia-repl
451 julia> begin
452 i = 0
453 cb(timer) = (global i += 1; println(i))
454 t = Timer(cb, 2, interval = 0.2)
455 wait(t)
456 sleep(0.5)
457 close(t)
458 end
459 1
460 2
461 3
462 ```
463 """
464 function Timer(cb::Function, timeout::Real; interval::Real = 0.0)
465 t = Timer(timeout, interval = interval)
466 waiter = Task(function()
467 while isopen(t)
468 success = try
469 wait(t)
470 true
471 catch exc # ignore possible exception on close()
472 isa(exc, EOFError) || rethrow(exc)
473 false
474 end
475 success && cb(t)
476 end
477 end)
478 # must start the task right away so that it can wait for the Timer before
479 # we re-enter the event loop. this avoids a race condition. see issue #12719
480 yield(waiter)
481 return t
482 end