StatProfilerHTML.jl report
Generated on tor 10 okt 2019 11:38:33
File source code
Line Exclusive Inclusive Code
1 # This file is a part of Julia. License is MIT: https://julialang.org/license
2
3 import .Libc: RawFD, dup
4 if Sys.iswindows()
5 import .Libc: WindowsRawSocket
6 const OS_HANDLE = WindowsRawSocket
7 const INVALID_OS_HANDLE = WindowsRawSocket(Ptr{Cvoid}(-1))
8 else
9 const OS_HANDLE = RawFD
10 const INVALID_OS_HANDLE = RawFD(-1)
11 end
12
13
14 ## types ##
15 abstract type IOServer end
16 abstract type LibuvServer <: IOServer end
17 abstract type LibuvStream <: IO end
18
19
20 # IO
21 # +- GenericIOBuffer{T<:AbstractArray{UInt8,1}} (not exported)
22 # +- AbstractPipe (not exported)
23 # . +- Pipe
24 # . +- Process (not exported)
25 # . +- ProcessChain (not exported)
26 # +- BufferStream
27 # +- DevNull (not exported)
28 # +- Filesystem.File
29 # +- LibuvStream (not exported)
30 # . +- PipeEndpoint (not exported)
31 # . +- TCPSocket
32 # . +- TTY (not exported)
33 # . +- UDPSocket
34 # +- IOBuffer = Base.GenericIOBuffer{Array{UInt8,1}}
35 # +- IOStream
36
37 # IOServer
38 # +- LibuvServer
39 # . +- PipeServer
40 # . +- TCPServer
41
42 # Redirectable = Union{IO, FileRedirect, Libc.RawFD} (not exported)
43
44 function stream_wait(x, c...) # for x::LibuvObject
45 preserve_handle(x)
46 try
47 return wait(c...)
48 finally
49 unpreserve_handle(x)
50 end
51 end
52
53 bytesavailable(s::LibuvStream) = bytesavailable(s.buffer)
54
55 function eof(s::LibuvStream)
56 if isopen(s) # fast path
57 bytesavailable(s) > 0 && return false
58 else
59 return bytesavailable(s) <= 0
60 end
61 wait_readnb(s,1)
62 return !isopen(s) && bytesavailable(s) <= 0
63 end
64
65 # Limit our default maximum read and buffer size,
66 # to avoid DoS-ing ourself into an OOM situation
67 const DEFAULT_READ_BUFFER_SZ = 10485760 # 10 MB
68
69 # manually limit our write size, if the OS doesn't support full-size writes
70 if Sys.iswindows()
71 const MAX_OS_WRITE = UInt(0x1FF0_0000) # 511 MB (determined semi-empirically, limited to 31 MB on XP)
72 else
73 const MAX_OS_WRITE = UInt(typemax(Csize_t))
74 end
75
76
77 const StatusUninit = 0 # handle is allocated, but not initialized
78 const StatusInit = 1 # handle is valid, but not connected/active
79 const StatusConnecting = 2 # handle is in process of connecting
80 const StatusOpen = 3 # handle is usable
81 const StatusActive = 4 # handle is listening for read/write/connect events
82 const StatusClosing = 5 # handle is closing / being closed
83 const StatusClosed = 6 # handle is closed
84 const StatusEOF = 7 # handle is a TTY that has seen an EOF event
85 const StatusPaused = 8 # handle is Active, but not consuming events, and will transition to Open if it receives an event
86 function uv_status_string(x)
87 s = x.status
88 if x.handle == C_NULL
89 if s == StatusClosed
90 return "closed"
91 elseif s == StatusUninit
92 return "null"
93 end
94 return "invalid status"
95 elseif s == StatusUninit
96 return "uninit"
97 elseif s == StatusInit
98 return "init"
99 elseif s == StatusConnecting
100 return "connecting"
101 elseif s == StatusOpen
102 return "open"
103 elseif s == StatusActive
104 return "active"
105 elseif s == StatusPaused
106 return "paused"
107 elseif s == StatusClosing
108 return "closing"
109 elseif s == StatusClosed
110 return "closed"
111 elseif s == StatusEOF
112 return "eof"
113 end
114 return "invalid status"
115 end
116
117 mutable struct PipeEndpoint <: LibuvStream
118 handle::Ptr{Cvoid}
119 status::Int
120 buffer::IOBuffer
121 readnotify::Condition
122 connectnotify::Condition
123 closenotify::Condition
124 sendbuf::Union{IOBuffer, Nothing}
125 lock::ReentrantLock
126 throttle::Int
127 function PipeEndpoint(handle::Ptr{Cvoid}, status)
128 p = new(handle,
129 status,
130 PipeBuffer(),
131 Condition(),
132 Condition(),
133 Condition(),
134 nothing,
135 ReentrantLock(),
136 DEFAULT_READ_BUFFER_SZ)
137 associate_julia_struct(handle, p)
138 finalizer(uvfinalize, p)
139 return p
140 end
141 end
142
143 function PipeEndpoint()
144 pipe = PipeEndpoint(Libc.malloc(_sizeof_uv_named_pipe), StatusUninit)
145 err = ccall(:uv_pipe_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Cint), eventloop(), pipe.handle, 0)
146 uv_error("failed to create pipe endpoint", err)
147 pipe.status = StatusInit
148 return pipe
149 end
150
151 mutable struct TTY <: LibuvStream
152 handle::Ptr{Cvoid}
153 status::Int
154 buffer::IOBuffer
155 readnotify::Condition
156 closenotify::Condition
157 sendbuf::Union{IOBuffer, Nothing}
158 lock::ReentrantLock
159 throttle::Int
160 @static if Sys.iswindows(); ispty::Bool; end
161 function TTY(handle::Ptr{Cvoid}, status)
162 tty = new(
163 handle,
164 status,
165 PipeBuffer(),
166 Condition(),
167 Condition(),
168 nothing,
169 ReentrantLock(),
170 DEFAULT_READ_BUFFER_SZ)
171 associate_julia_struct(handle, tty)
172 finalizer(uvfinalize, tty)
173 @static if Sys.iswindows()
174 tty.ispty = ccall(:jl_ispty, Cint, (Ptr{Cvoid},), handle) != 0
175 end
176 return tty
177 end
178 end
179
180 function TTY(fd::RawFD; readable::Bool = false)
181 tty = TTY(Libc.malloc(_sizeof_uv_tty), StatusUninit)
182 # This needs to go after associate_julia_struct so that there
183 # is no garbage in the ->data field
184 err = ccall(:uv_tty_init, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, RawFD, Int32),
185 eventloop(), tty.handle, fd, readable)
186 uv_error("TTY", err)
187 tty.status = StatusOpen
188 return tty
189 end
190
191 show(io::IO, stream::LibuvServer) = print(io, typeof(stream), "(",
192 _fd(stream), " ",
193 uv_status_string(stream), ")")
194 show(io::IO, stream::LibuvStream) = print(io, typeof(stream), "(",
195 _fd(stream), " ",
196 uv_status_string(stream), ", ",
197 bytesavailable(stream.buffer), " bytes waiting)")
198
199 # Shared LibuvStream object interface
200
201 function isreadable(io::LibuvStream)
202 bytesavailable(io) > 0 && return true
203 isopen(io) || return false
204 return ccall(:uv_is_readable, Cint, (Ptr{Cvoid},), io.handle) != 0
205 end
206
207 function iswritable(io::LibuvStream)
208 isopen(io) || return false
209 io.status == StatusClosing && return false
210 return ccall(:uv_is_writable, Cint, (Ptr{Cvoid},), io.handle) != 0
211 end
212
213 lock(s::LibuvStream) = lock(s.lock)
214 unlock(s::LibuvStream) = unlock(s.lock)
215
216 rawhandle(stream::LibuvStream) = stream.handle
217 unsafe_convert(::Type{Ptr{Cvoid}}, s::Union{LibuvStream, LibuvServer}) = s.handle
218
219 function init_stdio(handle::Ptr{Cvoid})
220 t = ccall(:jl_uv_handle_type, Int32, (Ptr{Cvoid},), handle)
221 if t == UV_FILE
222 fd = ccall(:jl_uv_file_handle, OS_HANDLE, (Ptr{Cvoid},), handle)
223 # TODO: Replace ios.c file with libuv fs?
224 # return File(fd)
225 @static if Sys.iswindows()
226 # TODO: Get ios.c to understand native handles
227 fd = ccall(:_open_osfhandle, RawFD, (WindowsRawSocket, Int32), fd, 0)
228 end
229 # TODO: Get fdio to work natively with file descriptors instead of integers
230 return fdio(cconvert(Cint, fd))
231 elseif t == UV_TTY
232 return TTY(handle, StatusOpen)
233 elseif t == UV_TCP
234 Sockets = require(PkgId(UUID((0x6462fe0b_24de_5631, 0x8697_dd941f90decc)), "Sockets"))
235 return Sockets.TCPSocket(handle, StatusOpen)
236 elseif t == UV_NAMED_PIPE
237 return PipeEndpoint(handle, StatusOpen)
238 else
239 throw(ArgumentError("invalid stdio type: $t"))
240 end
241 end
242
243 function isopen(x::Union{LibuvStream, LibuvServer})
244 if x.status == StatusUninit || x.status == StatusInit
245 throw(ArgumentError("$x is not initialized"))
246 end
247 x.status != StatusClosed && x.status != StatusEOF
248 end
249
250 function check_open(x::Union{LibuvStream, LibuvServer})
251 if !isopen(x) || x.status == StatusClosing
252 throw(ArgumentError("stream is closed or unusable"))
253 end
254 end
255
256 function wait_connected(x::Union{LibuvStream, LibuvServer})
257 check_open(x)
258 while x.status == StatusConnecting
259 stream_wait(x, x.connectnotify)
260 check_open(x)
261 end
262 end
263
264 function wait_readbyte(x::LibuvStream, c::UInt8)
265 if isopen(x) # fast path
266 occursin(c, x.buffer) && return
267 else
268 return
269 end
270 preserve_handle(x)
271 try
272 while isopen(x) && !occursin(c, x.buffer)
273 start_reading(x) # ensure we are reading
274 wait(x.readnotify)
275 end
276 finally
277 if isempty(x.readnotify.waitq)
278 stop_reading(x) # stop reading iff there are currently no other read clients of the stream
279 end
280 unpreserve_handle(x)
281 end
282 nothing
283 end
284
285 function wait_readnb(x::LibuvStream, nb::Int)
286 if isopen(x) # fast path
287 bytesavailable(x.buffer) >= nb && return
288 else
289 return
290 end
291 oldthrottle = x.throttle
292 preserve_handle(x)
293 try
294 while isopen(x) && bytesavailable(x.buffer) < nb
295 x.throttle = max(nb, x.throttle)
296 start_reading(x) # ensure we are reading
297 wait(x.readnotify)
298 end
299 finally
300 if isempty(x.readnotify.waitq)
301 stop_reading(x) # stop reading iff there are currently no other read clients of the stream
302 end
303 if oldthrottle <= x.throttle <= nb
304 x.throttle = oldthrottle
305 end
306 unpreserve_handle(x)
307 end
308 nothing
309 end
310
311 function wait_close(x::Union{LibuvStream, LibuvServer})
312 if isopen(x)
313 stream_wait(x, x.closenotify)
314 end
315 nothing
316 end
317
318 function close(stream::Union{LibuvStream, LibuvServer})
319 if stream.status == StatusInit
320 ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
321 elseif isopen(stream)
322 if stream.status != StatusClosing
323 ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
324 stream.status = StatusClosing
325 end
326 if uv_handle_data(stream) != C_NULL
327 stream_wait(stream, stream.closenotify)
328 end
329 end
330 nothing
331 end
332
333 function uvfinalize(uv::Union{LibuvStream, LibuvServer})
334 if uv.handle != C_NULL
335 disassociate_julia_struct(uv.handle) # not going to call the usual close hooks
336 if uv.status != StatusUninit
337 close(uv)
338 else
339 Libc.free(uv.handle)
340 end
341 uv.status = StatusClosed
342 uv.handle = C_NULL
343 end
344 nothing
345 end
346
347 if Sys.iswindows()
348 ispty(s::TTY) = s.ispty
349 ispty(s::IO) = false
350 end
351
352 """
353 displaysize([io::IO]) -> (lines, columns)
354
355 Return the nominal size of the screen that may be used for rendering output to
356 this `IO` object.
357 If no input is provided, the environment variables `LINES` and `COLUMNS` are read.
358 If those are not set, a default size of `(24, 80)` is returned.
359
360 # Examples
361 ```jldoctest
362 julia> withenv("LINES" => 30, "COLUMNS" => 100) do
363 displaysize()
364 end
365 (30, 100)
366 ```
367
368 To get your TTY size,
369
370 ```julia
371 julia> displaysize(stdout)
372 (34, 147)
373 ```
374 """
375 displaysize(io::IO) = displaysize()
376 displaysize() = (parse(Int, get(ENV, "LINES", "24")),
377 parse(Int, get(ENV, "COLUMNS", "80")))::Tuple{Int, Int}
378
379 function displaysize(io::TTY)
380 local h::Int, w::Int
381 default_size = displaysize()
382
383 @static if Sys.iswindows()
384 if ispty(io)
385 # io is actually a libuv pipe but a cygwin/msys2 pty
386 try
387 h, w = parse.(Int, split(read(open(Base.Cmd(String["stty", "size"]), "r", io).out, String)))
388 h > 0 || (h = default_size[1])
389 w > 0 || (w = default_size[2])
390 return h, w
391 catch
392 return default_size
393 end
394 end
395 end
396
397 s1 = Ref{Int32}(0)
398 s2 = Ref{Int32}(0)
399 Base.uv_error("size (TTY)", ccall(:uv_tty_get_winsize,
400 Int32, (Ptr{Cvoid}, Ptr{Int32}, Ptr{Int32}),
401 io, s1, s2) != 0)
402 w, h = s1[], s2[]
403 h > 0 || (h = default_size[1])
404 w > 0 || (w = default_size[2])
405 return h, w
406 end
407
408 in(key_value::Pair{Symbol,Bool}, ::TTY) = key_value.first === :color && key_value.second === have_color
409 haskey(::TTY, key::Symbol) = key === :color
410 getindex(::TTY, key::Symbol) = key === :color ? have_color : throw(KeyError(key))
411 get(::TTY, key::Symbol, default) = key === :color ? have_color : default
412
413 ### Libuv callbacks ###
414
415 ## BUFFER ##
416 ## Allocate space in buffer (for immediate use)
417 function alloc_request(buffer::IOBuffer, recommended_size::UInt)
418 ensureroom(buffer, Int(recommended_size))
419 ptr = buffer.append ? buffer.size + 1 : buffer.ptr
420 nb = length(buffer.data) - ptr + 1
421 return (pointer(buffer.data, ptr), nb)
422 end
423
424 notify_filled(buffer::IOBuffer, nread::Int, base::Ptr{Cvoid}, len::UInt) = notify_filled(buffer, nread)
425
426 function notify_filled(buffer::IOBuffer, nread::Int)
427 if buffer.append
428 buffer.size += nread
429 else
430 buffer.ptr += nread
431 end
432 end
433
434 function alloc_buf_hook(stream::LibuvStream, size::UInt)
435 throttle = UInt(stream.throttle)
436 return alloc_request(stream.buffer, (size > throttle) ? throttle : size)
437 end
438
439 function uv_alloc_buf(handle::Ptr{Cvoid}, size::Csize_t, buf::Ptr{Cvoid})
440 hd = uv_handle_data(handle)
441 if hd == C_NULL
442 ccall(:jl_uv_buf_set_len, Cvoid, (Ptr{Cvoid}, Csize_t), buf, 0)
443 return nothing
444 end
445 stream = unsafe_pointer_to_objref(hd)::LibuvStream
446
447 local data::Ptr{Cvoid}, newsize::Csize_t
448 if stream.status != StatusActive
449 data = C_NULL
450 newsize = 0
451 else
452 (data, newsize) = alloc_buf_hook(stream, UInt(size))
453 if data == C_NULL
454 newsize = 0
455 end
456 # avoid aliasing of `nread` with `errno` in uv_readcb
457 # or exceeding the Win32 maximum uv_buf_t len
458 maxsize = @static Sys.iswindows() ? typemax(Cint) : typemax(Cssize_t)
459 newsize > maxsize && (newsize = maxsize)
460 end
461
462 ccall(:jl_uv_buf_set_base, Cvoid, (Ptr{Cvoid}, Ptr{Cvoid}), buf, data)
463 ccall(:jl_uv_buf_set_len, Cvoid, (Ptr{Cvoid}, Csize_t), buf, newsize)
464 nothing
465 end
466
467 function uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid})
468 stream_unknown_type = @handle_as handle LibuvStream
469 nrequested = ccall(:jl_uv_buf_len, Csize_t, (Ptr{Cvoid},), buf)
470 function readcb_specialized(stream::LibuvStream, nread::Int, nrequested::UInt)
471 if nread < 0
472 if nread == UV_ENOBUFS && nrequested == 0
473 # remind the client that stream.buffer is full
474 notify(stream.readnotify)
475 elseif nread == UV_EOF
476 if isa(stream, TTY)
477 stream.status = StatusEOF # libuv called uv_stop_reading already
478 notify(stream.readnotify)
479 notify(stream.closenotify)
480 elseif stream.status != StatusClosing
481 # begin shutdown of the stream
482 ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
483 stream.status = StatusClosing
484 end
485 else
486 # This is a fatal connection error. Shutdown requests as per the usual
487 # close function won't work and libuv will fail with an assertion failure
488 ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream)
489 notify_error(stream.readnotify, _UVError("read", nread))
490 end
491 else
492 notify_filled(stream.buffer, nread)
493 notify(stream.readnotify)
494 end
495
496 # Stop background reading when
497 # 1) there's nobody paying attention to the data we are reading
498 # 2) we have accumulated a lot of unread data OR
499 # 3) we have an alternate buffer that has reached its limit.
500 if stream.status == StatusPaused ||
501 (stream.status == StatusActive &&
502 ((bytesavailable(stream.buffer) >= stream.throttle) ||
503 (bytesavailable(stream.buffer) >= stream.buffer.maxsize)))
504 # save cycles by stopping kernel notifications from arriving
505 ccall(:uv_read_stop, Cint, (Ptr{Cvoid},), stream)
506 stream.status = StatusOpen
507 end
508 nothing
509 end
510 readcb_specialized(stream_unknown_type, Int(nread), UInt(nrequested))
511 end
512
513 function reseteof(x::TTY)
514 if x.status == StatusEOF
515 x.status = StatusOpen
516 end
517 nothing
518 end
519
520 function _uv_hook_close(uv::Union{LibuvStream, LibuvServer})
521 uv.handle = C_NULL
522 uv.status = StatusClosed
523 # notify any listeners that exist on this libuv stream type
524 notify(uv.closenotify)
525 isdefined(uv, :readnotify) && notify(uv.readnotify)
526 isdefined(uv, :connectnotify) && notify(uv.connectnotify)
527 nothing
528 end
529
530
531 ##########################################
532 # Pipe Abstraction
533 # (composed of two half-pipes: .in and .out)
534 ##########################################
535
536 mutable struct Pipe <: AbstractPipe
537 in::PipeEndpoint # writable
538 out::PipeEndpoint # readable
539 end
540
541 """
542 Construct an uninitialized Pipe object.
543
544 The appropriate end of the pipe will be automatically initialized if
545 the object is used in process spawning. This can be useful to easily
546 obtain references in process pipelines, e.g.:
547
548 ```
549 julia> err = Pipe()
550
551 # After this `err` will be initialized and you may read `foo`'s
552 # stderr from the `err` pipe.
553 julia> run(pipeline(pipeline(`foo`, stderr=err), `cat`), wait=false)
554 ```
555 """
556 Pipe() = Pipe(PipeEndpoint(), PipeEndpoint())
557 pipe_reader(p::Pipe) = p.out
558 pipe_writer(p::Pipe) = p.in
559
560 function link_pipe!(pipe::Pipe;
561 reader_supports_async = false,
562 writer_supports_async = false)
563 link_pipe!(pipe.out, reader_supports_async, pipe.in, writer_supports_async)
564 return pipe
565 end
566
567 show(io::IO, stream::Pipe) = print(io,
568 "Pipe(",
569 _fd(stream.in), " ",
570 uv_status_string(stream.in), " => ",
571 _fd(stream.out), " ",
572 uv_status_string(stream.out), ", ",
573 bytesavailable(stream), " bytes waiting)")
574
575
576 ## Functions for PipeEndpoint and PipeServer ##
577
578 function open_pipe!(p::PipeEndpoint, handle::OS_HANDLE, readable::Bool, writable::Bool)
579 if p.status != StatusInit
580 error("pipe is already in use or has been closed")
581 end
582 err = ccall(:jl_pipe_open, Int32, (Ptr{Cvoid}, OS_HANDLE, Cint, Cint), p.handle, handle, readable, writable)
583 uv_error("open_pipe", err)
584 p.status = StatusOpen
585 return p
586 end
587
588
589 function link_pipe!(read_end::PipeEndpoint, reader_supports_async::Bool,
590 write_end::PipeEndpoint, writer_supports_async::Bool)
591 rd, wr = link_pipe(reader_supports_async, writer_supports_async)
592 try
593 try
594 open_pipe!(read_end, rd, true, false)
595 catch e
596 close_pipe_sync(rd)
597 rethrow(e)
598 end
599 read_end.status = StatusOpen
600 open_pipe!(write_end, wr, false, true)
601 catch e
602 close_pipe_sync(wr)
603 rethrow(e)
604 end
605 write_end.status = StatusOpen
606 nothing
607 end
608
609 function link_pipe(reader_supports_async::Bool, writer_supports_async::Bool)
610 UV_NONBLOCK_PIPE = 0x40
611 fildes = Ref{Pair{OS_HANDLE, OS_HANDLE}}(INVALID_OS_HANDLE => INVALID_OS_HANDLE) # read (in) => write (out)
612 err = ccall(:uv_pipe, Int32, (Ptr{Pair{OS_HANDLE, OS_HANDLE}}, Cint, Cint),
613 fildes,
614 reader_supports_async * UV_NONBLOCK_PIPE,
615 writer_supports_async * UV_NONBLOCK_PIPE)
616 uv_error("pipe", err)
617 return fildes[]
618 end
619
620 if Sys.iswindows()
621 function close_pipe_sync(handle::WindowsRawSocket)
622 ccall(:CloseHandle, stdcall, Cint, (WindowsRawSocket,), handle)
623 nothing
624 end
625 else
626 function close_pipe_sync(handle::RawFD)
627 ccall(:close, Cint, (RawFD,), handle)
628 nothing
629 end
630 end
631
632 ## Functions for any LibuvStream ##
633
634 # flow control
635
636 function start_reading(stream::LibuvStream)
637 if stream.status == StatusOpen
638 if !isreadable(stream)
639 error("tried to read a stream that is not readable")
640 end
641 # libuv may call the alloc callback immediately
642 # for a TTY on Windows, so ensure the status is set first
643 stream.status = StatusActive
644 ret = ccall(:uv_read_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}),
645 stream, uv_jl_alloc_buf::Ptr{Cvoid}, uv_jl_readcb::Ptr{Cvoid})
646 return ret
647 elseif stream.status == StatusPaused
648 stream.status = StatusActive
649 return Int32(0)
650 elseif stream.status == StatusActive
651 return Int32(0)
652 else
653 return Int32(-1)
654 end
655 end
656
657 if Sys.iswindows()
658 # the low performance version of stop_reading is required
659 # on Windows due to a NT kernel bug that we can't use a blocking
660 # stream for non-blocking (overlapped) calls,
661 # and a ReadFile call blocking on one thread
662 # causes all other operations on that stream to lockup
663 function stop_reading(stream::LibuvStream)
664 if stream.status == StatusActive
665 stream.status = StatusOpen
666 ccall(:uv_read_stop, Cint, (Ptr{Cvoid},), stream)
667 end
668 nothing
669 end
670 else
671 function stop_reading(stream::LibuvStream)
672 if stream.status == StatusActive
673 stream.status = StatusPaused
674 end
675 nothing
676 end
677 end
678
679 # bulk read / write
680
681 readbytes!(s::LibuvStream, a::Vector{UInt8}, nb = length(a)) = readbytes!(s, a, Int(nb))
682 function readbytes!(s::LibuvStream, a::Vector{UInt8}, nb::Int)
683 sbuf = s.buffer
684 @assert sbuf.seekable == false
685 @assert sbuf.maxsize >= nb
686
687 if bytesavailable(sbuf) >= nb
688 return readbytes!(sbuf, a, nb)
689 end
690
691 if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer
692 wait_readnb(s, nb)
693 return readbytes!(sbuf, a, nb)
694 else
695 try
696 stop_reading(s) # Just playing it safe, since we are going to switch buffers.
697 newbuf = PipeBuffer(a, maxsize = nb)
698 newbuf.size = 0 # reset the write pointer to the beginning
699 s.buffer = newbuf
700 write(newbuf, sbuf)
701 wait_readnb(s, Int(nb))
702 compact(newbuf)
703 return bytesavailable(newbuf)
704 finally
705 s.buffer = sbuf
706 if !isempty(s.readnotify.waitq)
707 start_reading(s) # resume reading iff there are currently other read clients of the stream
708 end
709 end
710 end
711 @assert false # unreachable
712 end
713
714 function read(stream::LibuvStream)
715 wait_readnb(stream, typemax(Int))
716 return take!(stream.buffer)
717 end
718
719 function unsafe_read(s::LibuvStream, p::Ptr{UInt8}, nb::UInt)
720 sbuf = s.buffer
721 @assert sbuf.seekable == false
722 @assert sbuf.maxsize >= nb
723
724 if bytesavailable(sbuf) >= nb
725 return unsafe_read(sbuf, p, nb)
726 end
727
728 if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer
729 wait_readnb(s, Int(nb))
730 unsafe_read(sbuf, p, nb)
731 else
732 try
733 stop_reading(s) # Just playing it safe, since we are going to switch buffers.
734 newbuf = PipeBuffer(unsafe_wrap(Array, p, nb), maxsize = Int(nb))
735 newbuf.size = 0 # reset the write pointer to the beginning
736 s.buffer = newbuf
737 write(newbuf, sbuf)
738 wait_readnb(s, Int(nb))
739 nb == bytesavailable(newbuf) || throw(EOFError())
740 finally
741 s.buffer = sbuf
742 if !isempty(s.readnotify.waitq)
743 start_reading(s) # resume reading iff there are currently other read clients of the stream
744 end
745 end
746 end
747 nothing
748 end
749
750 function read(this::LibuvStream, ::Type{UInt8})
751 wait_readnb(this, 1)
752 buf = this.buffer
753 @assert buf.seekable == false
754 return read(buf, UInt8)
755 end
756
757 function readavailable(this::LibuvStream)
758 wait_readnb(this, 1)
759 buf = this.buffer
760 @assert buf.seekable == false
761 return take!(buf)
762 end
763
764 function readuntil(this::LibuvStream, c::UInt8; keep::Bool=false)
765 wait_readbyte(this, c)
766 buf = this.buffer
767 @assert buf.seekable == false
768 return readuntil(buf, c, keep=keep)
769 end
770
771 uv_write(s::LibuvStream, p::Vector{UInt8}) = uv_write(s, pointer(p), UInt(sizeof(p)))
772
773
3 (7.14%) samples spent in uv_write
0 (ex.), 3 (100.00%) (incl.) when called from unsafe_write line 830
function uv_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt)
774 uvw = uv_write_async(s, p, n)
775 ct = current_task()
776 preserve_handle(ct)
777 try
778 # wait for the last chunk to complete (or error)
779 # assume that any errors would be sticky,
780 # (so we don't need to monitor the error status of the intermediate writes)
781 uv_req_set_data(uvw, ct)
782 3 (7.14%)
3 (100.00%) samples spent calling wait
wait()
783 finally
784 if uv_req_data(uvw) != C_NULL
785 # uvw is still alive,
786 # so make sure we won't get spurious notifications later
787 uv_req_set_data(uvw, C_NULL)
788 else
789 # done with uvw
790 Libc.free(uvw)
791 end
792 unpreserve_handle(ct)
793 end
794 return Int(n)
795 end
796
797 # helper function for uv_write that returns the uv_write_t struct for the write
798 # rather than waiting on it
799 function uv_write_async(s::LibuvStream, p::Ptr{UInt8}, n::UInt)
800 check_open(s)
801 while true
802 uvw = Libc.malloc(_sizeof_uv_write)
803 uv_req_set_data(uvw, C_NULL) # in case we get interrupted before arriving at the wait call
804 nwrite = min(n, MAX_OS_WRITE) # split up the write into chunks the OS can handle.
805 # TODO: use writev, when that is added to uv-win
806 err = ccall(:jl_uv_write,
807 Int32,
808 (Ptr{Cvoid}, Ptr{Cvoid}, UInt, Ptr{Cvoid}, Ptr{Cvoid}),
809 s, p, nwrite, uvw,
810 uv_jl_writecb_task::Ptr{Cvoid})
811 if err < 0
812 Libc.free(uvw)
813 uv_error("write", err)
814 end
815 n -= nwrite
816 p += nwrite
817 if n == 0
818 return uvw
819 end
820 end
821 end
822
823
824 # Optimized send
825 # - smaller writes are buffered, final uv write on flush or when buffer full
826 # - large isbits arrays are unbuffered and written directly
827
828
3 (7.14%) samples spent in unsafe_write
0 (ex.), 3 (100.00%) (incl.) when called from print line 87
function unsafe_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt)
829 if s.sendbuf === nothing
830 3 (7.14%)
3 (100.00%) samples spent calling uv_write
return uv_write(s, p, UInt(n))
831 end
832
833 buf = s.sendbuf
834 totb = bytesavailable(buf) + n
835 if totb < buf.maxsize
836 nb = unsafe_write(buf, p, n)
837 else
838 flush(s)
839 if n > buf.maxsize
840 nb = uv_write(s, p, n)
841 else
842 nb = unsafe_write(buf, p, n)
843 end
844 end
845 return nb
846 end
847
848 function flush(s::LibuvStream)
849 buf = s.sendbuf
850 if buf !== nothing
851 if bytesavailable(buf) > 0
852 arr = take!(buf) # Array of UInt8s
853 uv_write(s, arr)
854 return
855 end
856 end
857 uv_write(s, Ptr{UInt8}(Base.eventloop()), UInt(0)) # zero write from a random pointer to flush current queue
858 return
859 end
860
861 buffer_writes(s::LibuvStream, bufsize) = (s.sendbuf=PipeBuffer(bufsize); s)
862
863 ## low-level calls to libuv ##
864
865 function write(s::LibuvStream, b::UInt8)
866 buf = s.sendbuf
867 if buf !== nothing
868 if bytesavailable(buf) + 1 < buf.maxsize
869 return write(buf, b)
870 end
871 end
872 return write(s, Ref{UInt8}(b))
873 end
874
875 function uv_writecb_task(req::Ptr{Cvoid}, status::Cint)
876 d = uv_req_data(req)
877 if d != C_NULL
878 uv_req_set_data(req, C_NULL) # let the Task know we got the writecb
879 t = unsafe_pointer_to_objref(d)::Task
880 if status < 0
881 err = _UVError("write", status)
882 schedule(t, err, error=true)
883 else
884 schedule(t)
885 end
886 else
887 # no owner for this req, safe to just free it
888 Libc.free(req)
889 end
890 nothing
891 end
892
893 _fd(x::IOStream) = RawFD(fd(x))
894
895 function _fd(x::Union{LibuvStream, LibuvServer})
896 fd = Ref{OS_HANDLE}(INVALID_OS_HANDLE)
897 if x.status != StatusUninit && x.status != StatusClosed
898 err = ccall(:uv_fileno, Int32, (Ptr{Cvoid}, Ptr{OS_HANDLE}), x.handle, fd)
899 # handle errors by returning INVALID_OS_HANDLE
900 end
901 return fd[]
902 end
903
904 for (x, writable, unix_fd, c_symbol) in
905 ((:stdin, false, 0, :jl_uv_stdin),
906 (:stdout, true, 1, :jl_uv_stdout),
907 (:stderr, true, 2, :jl_uv_stderr))
908 f = Symbol("redirect_", lowercase(string(x)))
909 _f = Symbol("_", f)
910 @eval begin
911 function ($_f)(stream)
912 global $x
913 posix_fd = _fd(stream)
914 @static if Sys.iswindows()
915 ccall(:SetStdHandle, stdcall, Int32, (Int32, OS_HANDLE),
916 $(-10 - unix_fd), Libc._get_osfhandle(posix_fd))
917 end
918 dup(posix_fd, RawFD($unix_fd))
919 $x = stream
920 nothing
921 end
922 function ($f)(handle::Union{LibuvStream, IOStream})
923 $(_f)(handle)
924 unsafe_store!(cglobal($(Expr(:quote, c_symbol)), Ptr{Cvoid}),
925 handle.handle)
926 return handle
927 end
928 function ($f)()
929 p = link_pipe!(Pipe())
930 read, write = p.out, p.in
931 ($f)($(writable ? :write : :read))
932 return (read, write)
933 end
934 end
935 end
936
937 """
938 redirect_stdout([stream]) -> (rd, wr)
939
940 Create a pipe to which all C and Julia level [`stdout`](@ref) output
941 will be redirected.
942 Returns a tuple `(rd, wr)` representing the pipe ends.
943 Data written to [`stdout`](@ref) may now be read from the `rd` end of
944 the pipe. The `wr` end is given for convenience in case the old
945 [`stdout`](@ref) object was cached by the user and needs to be replaced
946 elsewhere.
947
948 If called with the optional `stream` argument, then returns `stream` itself.
949
950 !!! note
951 `stream` must be a `TTY`, a `Pipe`, or a socket.
952 """
953 redirect_stdout
954
955 """
956 redirect_stderr([stream]) -> (rd, wr)
957
958 Like [`redirect_stdout`](@ref), but for [`stderr`](@ref).
959
960 !!! note
961 `stream` must be a `TTY`, a `Pipe`, or a socket.
962 """
963 redirect_stderr
964
965 """
966 redirect_stdin([stream]) -> (rd, wr)
967
968 Like [`redirect_stdout`](@ref), but for [`stdin`](@ref).
969 Note that the order of the return tuple is still `(rd, wr)`,
970 i.e. data to be read from [`stdin`](@ref) may be written to `wr`.
971
972 !!! note
973 `stream` must be a `TTY`, a `Pipe`, or a socket.
974 """
975 redirect_stdin
976
977 for (F,S) in ((:redirect_stdin, :stdin), (:redirect_stdout, :stdout), (:redirect_stderr, :stderr))
978 @eval function $F(f::Function, stream)
979 STDOLD = $S
980 local ret
981 $F(stream)
982 try
983 ret = f()
984 finally
985 $F(STDOLD)
986 end
987 ret
988 end
989 end
990
991 """
992 redirect_stdout(f::Function, stream)
993
994 Run the function `f` while redirecting [`stdout`](@ref) to `stream`.
995 Upon completion, [`stdout`](@ref) is restored to its prior setting.
996
997 !!! note
998 `stream` must be a `TTY`, a `Pipe`, or a socket.
999 """
1000 redirect_stdout(f::Function, stream)
1001
1002 """
1003 redirect_stderr(f::Function, stream)
1004
1005 Run the function `f` while redirecting [`stderr`](@ref) to `stream`.
1006 Upon completion, [`stderr`](@ref) is restored to its prior setting.
1007
1008 !!! note
1009 `stream` must be a `TTY`, a `Pipe`, or a socket.
1010 """
1011 redirect_stderr(f::Function, stream)
1012
1013 """
1014 redirect_stdin(f::Function, stream)
1015
1016 Run the function `f` while redirecting [`stdin`](@ref) to `stream`.
1017 Upon completion, [`stdin`](@ref) is restored to its prior setting.
1018
1019 !!! note
1020 `stream` must be a `TTY`, a `Pipe`, or a socket.
1021 """
1022 redirect_stdin(f::Function, stream)
1023
1024 mark(x::LibuvStream) = mark(x.buffer)
1025 unmark(x::LibuvStream) = unmark(x.buffer)
1026 reset(x::LibuvStream) = reset(x.buffer)
1027 ismarked(x::LibuvStream) = ismarked(x.buffer)
1028
1029 function peek(s::LibuvStream)
1030 mark(s)
1031 try read(s, UInt8)
1032 finally
1033 reset(s)
1034 end
1035 end
1036
1037 # BufferStream's are non-OS streams, backed by a regular IOBuffer
1038 mutable struct BufferStream <: LibuvStream
1039 buffer::IOBuffer
1040 r_c::Condition
1041 close_c::Condition
1042 is_open::Bool
1043 buffer_writes::Bool
1044 lock::ReentrantLock
1045
1046 BufferStream() = new(PipeBuffer(), Condition(), Condition(), true, false, ReentrantLock())
1047 end
1048
1049 isopen(s::BufferStream) = s.is_open
1050 function close(s::BufferStream)
1051 s.is_open = false
1052 notify(s.r_c)
1053 notify(s.close_c)
1054 nothing
1055 end
1056 uvfinalize(s::BufferStream) = nothing
1057
1058 read(s::BufferStream, ::Type{UInt8}) = (wait_readnb(s, 1); read(s.buffer, UInt8))
1059 unsafe_read(s::BufferStream, a::Ptr{UInt8}, nb::UInt) = (wait_readnb(s, Int(nb)); unsafe_read(s.buffer, a, nb))
1060 bytesavailable(s::BufferStream) = bytesavailable(s.buffer)
1061
1062 isreadable(s::BufferStream) = s.buffer.readable
1063 iswritable(s::BufferStream) = s.buffer.writable
1064
1065 function wait_readnb(s::BufferStream, nb::Int)
1066 while isopen(s) && bytesavailable(s.buffer) < nb
1067 wait(s.r_c)
1068 end
1069 end
1070
1071 show(io::IO, s::BufferStream) = print(io,"BufferStream() bytes waiting:",bytesavailable(s.buffer),", isopen:", s.is_open)
1072
1073 function wait_readbyte(s::BufferStream, c::UInt8)
1074 while isopen(s) && !occursin(c, s.buffer)
1075 wait(s.r_c)
1076 end
1077 end
1078
1079 wait_close(s::BufferStream) = if isopen(s); wait(s.close_c); end
1080 start_reading(s::BufferStream) = Int32(0)
1081 stop_reading(s::BufferStream) = nothing
1082
1083 write(s::BufferStream, b::UInt8) = write(s, Ref{UInt8}(b))
1084 function unsafe_write(s::BufferStream, p::Ptr{UInt8}, nb::UInt)
1085 rv = unsafe_write(s.buffer, p, nb)
1086 !(s.buffer_writes) && notify(s.r_c)
1087 return rv
1088 end
1089
1090 function eof(s::BufferStream)
1091 wait_readnb(s, 1)
1092 return !isopen(s) && bytesavailable(s)<=0
1093 end
1094
1095 # If buffer_writes is called, it will delay notifying waiters till a flush is called.
1096 buffer_writes(s::BufferStream, bufsize=0) = (s.buffer_writes=true; s)
1097 flush(s::BufferStream) = (notify(s.r_c); nothing)