Line | Exclusive | Inclusive | Code |
---|---|---|---|
1 | # This file is a part of Julia. License is MIT: https://julialang.org/license | ||
2 | |||
3 | import .Libc: RawFD, dup | ||
4 | if Sys.iswindows() | ||
5 | import .Libc: WindowsRawSocket | ||
6 | const OS_HANDLE = WindowsRawSocket | ||
7 | const INVALID_OS_HANDLE = WindowsRawSocket(Ptr{Cvoid}(-1)) | ||
8 | else | ||
9 | const OS_HANDLE = RawFD | ||
10 | const INVALID_OS_HANDLE = RawFD(-1) | ||
11 | end | ||
12 | |||
13 | |||
14 | ## types ## | ||
15 | abstract type IOServer end | ||
16 | abstract type LibuvServer <: IOServer end | ||
17 | abstract type LibuvStream <: IO end | ||
18 | |||
19 | |||
20 | # IO | ||
21 | # +- GenericIOBuffer{T<:AbstractArray{UInt8,1}} (not exported) | ||
22 | # +- AbstractPipe (not exported) | ||
23 | # . +- Pipe | ||
24 | # . +- Process (not exported) | ||
25 | # . +- ProcessChain (not exported) | ||
26 | # +- BufferStream | ||
27 | # +- DevNull (not exported) | ||
28 | # +- Filesystem.File | ||
29 | # +- LibuvStream (not exported) | ||
30 | # . +- PipeEndpoint (not exported) | ||
31 | # . +- TCPSocket | ||
32 | # . +- TTY (not exported) | ||
33 | # . +- UDPSocket | ||
34 | # +- IOBuffer = Base.GenericIOBuffer{Array{UInt8,1}} | ||
35 | # +- IOStream | ||
36 | |||
37 | # IOServer | ||
38 | # +- LibuvServer | ||
39 | # . +- PipeServer | ||
40 | # . +- TCPServer | ||
41 | |||
42 | # Redirectable = Union{IO, FileRedirect, Libc.RawFD} (not exported) | ||
43 | |||
44 | function stream_wait(x, c...) # for x::LibuvObject | ||
45 | preserve_handle(x) | ||
46 | try | ||
47 | return wait(c...) | ||
48 | finally | ||
49 | unpreserve_handle(x) | ||
50 | end | ||
51 | end | ||
52 | |||
53 | bytesavailable(s::LibuvStream) = bytesavailable(s.buffer) | ||
54 | |||
55 | function eof(s::LibuvStream) | ||
56 | if isopen(s) # fast path | ||
57 | bytesavailable(s) > 0 && return false | ||
58 | else | ||
59 | return bytesavailable(s) <= 0 | ||
60 | end | ||
61 | wait_readnb(s,1) | ||
62 | return !isopen(s) && bytesavailable(s) <= 0 | ||
63 | end | ||
64 | |||
65 | # Limit our default maximum read and buffer size, | ||
66 | # to avoid DoS-ing ourself into an OOM situation | ||
67 | const DEFAULT_READ_BUFFER_SZ = 10485760 # 10 MB | ||
68 | |||
69 | # manually limit our write size, if the OS doesn't support full-size writes | ||
70 | if Sys.iswindows() | ||
71 | const MAX_OS_WRITE = UInt(0x1FF0_0000) # 511 MB (determined semi-empirically, limited to 31 MB on XP) | ||
72 | else | ||
73 | const MAX_OS_WRITE = UInt(typemax(Csize_t)) | ||
74 | end | ||
75 | |||
76 | |||
77 | const StatusUninit = 0 # handle is allocated, but not initialized | ||
78 | const StatusInit = 1 # handle is valid, but not connected/active | ||
79 | const StatusConnecting = 2 # handle is in process of connecting | ||
80 | const StatusOpen = 3 # handle is usable | ||
81 | const StatusActive = 4 # handle is listening for read/write/connect events | ||
82 | const StatusClosing = 5 # handle is closing / being closed | ||
83 | const StatusClosed = 6 # handle is closed | ||
84 | const StatusEOF = 7 # handle is a TTY that has seen an EOF event | ||
85 | const StatusPaused = 8 # handle is Active, but not consuming events, and will transition to Open if it receives an event | ||
86 | function uv_status_string(x) | ||
87 | s = x.status | ||
88 | if x.handle == C_NULL | ||
89 | if s == StatusClosed | ||
90 | return "closed" | ||
91 | elseif s == StatusUninit | ||
92 | return "null" | ||
93 | end | ||
94 | return "invalid status" | ||
95 | elseif s == StatusUninit | ||
96 | return "uninit" | ||
97 | elseif s == StatusInit | ||
98 | return "init" | ||
99 | elseif s == StatusConnecting | ||
100 | return "connecting" | ||
101 | elseif s == StatusOpen | ||
102 | return "open" | ||
103 | elseif s == StatusActive | ||
104 | return "active" | ||
105 | elseif s == StatusPaused | ||
106 | return "paused" | ||
107 | elseif s == StatusClosing | ||
108 | return "closing" | ||
109 | elseif s == StatusClosed | ||
110 | return "closed" | ||
111 | elseif s == StatusEOF | ||
112 | return "eof" | ||
113 | end | ||
114 | return "invalid status" | ||
115 | end | ||
116 | |||
117 | mutable struct PipeEndpoint <: LibuvStream | ||
118 | handle::Ptr{Cvoid} | ||
119 | status::Int | ||
120 | buffer::IOBuffer | ||
121 | readnotify::Condition | ||
122 | connectnotify::Condition | ||
123 | closenotify::Condition | ||
124 | sendbuf::Union{IOBuffer, Nothing} | ||
125 | lock::ReentrantLock | ||
126 | throttle::Int | ||
127 | function PipeEndpoint(handle::Ptr{Cvoid}, status) | ||
128 | p = new(handle, | ||
129 | status, | ||
130 | PipeBuffer(), | ||
131 | Condition(), | ||
132 | Condition(), | ||
133 | Condition(), | ||
134 | nothing, | ||
135 | ReentrantLock(), | ||
136 | DEFAULT_READ_BUFFER_SZ) | ||
137 | associate_julia_struct(handle, p) | ||
138 | finalizer(uvfinalize, p) | ||
139 | return p | ||
140 | end | ||
141 | end | ||
142 | |||
143 | function PipeEndpoint() | ||
144 | pipe = PipeEndpoint(Libc.malloc(_sizeof_uv_named_pipe), StatusUninit) | ||
145 | err = ccall(:uv_pipe_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Cint), eventloop(), pipe.handle, 0) | ||
146 | uv_error("failed to create pipe endpoint", err) | ||
147 | pipe.status = StatusInit | ||
148 | return pipe | ||
149 | end | ||
150 | |||
151 | mutable struct TTY <: LibuvStream | ||
152 | handle::Ptr{Cvoid} | ||
153 | status::Int | ||
154 | buffer::IOBuffer | ||
155 | readnotify::Condition | ||
156 | closenotify::Condition | ||
157 | sendbuf::Union{IOBuffer, Nothing} | ||
158 | lock::ReentrantLock | ||
159 | throttle::Int | ||
160 | @static if Sys.iswindows(); ispty::Bool; end | ||
161 | function TTY(handle::Ptr{Cvoid}, status) | ||
162 | tty = new( | ||
163 | handle, | ||
164 | status, | ||
165 | PipeBuffer(), | ||
166 | Condition(), | ||
167 | Condition(), | ||
168 | nothing, | ||
169 | ReentrantLock(), | ||
170 | DEFAULT_READ_BUFFER_SZ) | ||
171 | associate_julia_struct(handle, tty) | ||
172 | finalizer(uvfinalize, tty) | ||
173 | @static if Sys.iswindows() | ||
174 | tty.ispty = ccall(:jl_ispty, Cint, (Ptr{Cvoid},), handle) != 0 | ||
175 | end | ||
176 | return tty | ||
177 | end | ||
178 | end | ||
179 | |||
180 | function TTY(fd::RawFD; readable::Bool = false) | ||
181 | tty = TTY(Libc.malloc(_sizeof_uv_tty), StatusUninit) | ||
182 | # This needs to go after associate_julia_struct so that there | ||
183 | # is no garbage in the ->data field | ||
184 | err = ccall(:uv_tty_init, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, RawFD, Int32), | ||
185 | eventloop(), tty.handle, fd, readable) | ||
186 | uv_error("TTY", err) | ||
187 | tty.status = StatusOpen | ||
188 | return tty | ||
189 | end | ||
190 | |||
191 | show(io::IO, stream::LibuvServer) = print(io, typeof(stream), "(", | ||
192 | _fd(stream), " ", | ||
193 | uv_status_string(stream), ")") | ||
194 | show(io::IO, stream::LibuvStream) = print(io, typeof(stream), "(", | ||
195 | _fd(stream), " ", | ||
196 | uv_status_string(stream), ", ", | ||
197 | bytesavailable(stream.buffer), " bytes waiting)") | ||
198 | |||
199 | # Shared LibuvStream object interface | ||
200 | |||
201 | function isreadable(io::LibuvStream) | ||
202 | bytesavailable(io) > 0 && return true | ||
203 | isopen(io) || return false | ||
204 | return ccall(:uv_is_readable, Cint, (Ptr{Cvoid},), io.handle) != 0 | ||
205 | end | ||
206 | |||
207 | function iswritable(io::LibuvStream) | ||
208 | isopen(io) || return false | ||
209 | io.status == StatusClosing && return false | ||
210 | return ccall(:uv_is_writable, Cint, (Ptr{Cvoid},), io.handle) != 0 | ||
211 | end | ||
212 | |||
213 | lock(s::LibuvStream) = lock(s.lock) | ||
214 | unlock(s::LibuvStream) = unlock(s.lock) | ||
215 | |||
216 | rawhandle(stream::LibuvStream) = stream.handle | ||
217 | unsafe_convert(::Type{Ptr{Cvoid}}, s::Union{LibuvStream, LibuvServer}) = s.handle | ||
218 | |||
219 | function init_stdio(handle::Ptr{Cvoid}) | ||
220 | t = ccall(:jl_uv_handle_type, Int32, (Ptr{Cvoid},), handle) | ||
221 | if t == UV_FILE | ||
222 | fd = ccall(:jl_uv_file_handle, OS_HANDLE, (Ptr{Cvoid},), handle) | ||
223 | # TODO: Replace ios.c file with libuv fs? | ||
224 | # return File(fd) | ||
225 | @static if Sys.iswindows() | ||
226 | # TODO: Get ios.c to understand native handles | ||
227 | fd = ccall(:_open_osfhandle, RawFD, (WindowsRawSocket, Int32), fd, 0) | ||
228 | end | ||
229 | # TODO: Get fdio to work natively with file descriptors instead of integers | ||
230 | return fdio(cconvert(Cint, fd)) | ||
231 | elseif t == UV_TTY | ||
232 | return TTY(handle, StatusOpen) | ||
233 | elseif t == UV_TCP | ||
234 | Sockets = require(PkgId(UUID((0x6462fe0b_24de_5631, 0x8697_dd941f90decc)), "Sockets")) | ||
235 | return Sockets.TCPSocket(handle, StatusOpen) | ||
236 | elseif t == UV_NAMED_PIPE | ||
237 | return PipeEndpoint(handle, StatusOpen) | ||
238 | else | ||
239 | throw(ArgumentError("invalid stdio type: $t")) | ||
240 | end | ||
241 | end | ||
242 | |||
243 | function isopen(x::Union{LibuvStream, LibuvServer}) | ||
244 | if x.status == StatusUninit || x.status == StatusInit | ||
245 | throw(ArgumentError("$x is not initialized")) | ||
246 | end | ||
247 | x.status != StatusClosed && x.status != StatusEOF | ||
248 | end | ||
249 | |||
250 | function check_open(x::Union{LibuvStream, LibuvServer}) | ||
251 | if !isopen(x) || x.status == StatusClosing | ||
252 | throw(ArgumentError("stream is closed or unusable")) | ||
253 | end | ||
254 | end | ||
255 | |||
256 | function wait_connected(x::Union{LibuvStream, LibuvServer}) | ||
257 | check_open(x) | ||
258 | while x.status == StatusConnecting | ||
259 | stream_wait(x, x.connectnotify) | ||
260 | check_open(x) | ||
261 | end | ||
262 | end | ||
263 | |||
264 | function wait_readbyte(x::LibuvStream, c::UInt8) | ||
265 | if isopen(x) # fast path | ||
266 | occursin(c, x.buffer) && return | ||
267 | else | ||
268 | return | ||
269 | end | ||
270 | preserve_handle(x) | ||
271 | try | ||
272 | while isopen(x) && !occursin(c, x.buffer) | ||
273 | start_reading(x) # ensure we are reading | ||
274 | wait(x.readnotify) | ||
275 | end | ||
276 | finally | ||
277 | if isempty(x.readnotify.waitq) | ||
278 | stop_reading(x) # stop reading iff there are currently no other read clients of the stream | ||
279 | end | ||
280 | unpreserve_handle(x) | ||
281 | end | ||
282 | nothing | ||
283 | end | ||
284 | |||
285 | function wait_readnb(x::LibuvStream, nb::Int) | ||
286 | if isopen(x) # fast path | ||
287 | bytesavailable(x.buffer) >= nb && return | ||
288 | else | ||
289 | return | ||
290 | end | ||
291 | oldthrottle = x.throttle | ||
292 | preserve_handle(x) | ||
293 | try | ||
294 | while isopen(x) && bytesavailable(x.buffer) < nb | ||
295 | x.throttle = max(nb, x.throttle) | ||
296 | start_reading(x) # ensure we are reading | ||
297 | wait(x.readnotify) | ||
298 | end | ||
299 | finally | ||
300 | if isempty(x.readnotify.waitq) | ||
301 | stop_reading(x) # stop reading iff there are currently no other read clients of the stream | ||
302 | end | ||
303 | if oldthrottle <= x.throttle <= nb | ||
304 | x.throttle = oldthrottle | ||
305 | end | ||
306 | unpreserve_handle(x) | ||
307 | end | ||
308 | nothing | ||
309 | end | ||
310 | |||
311 | function wait_close(x::Union{LibuvStream, LibuvServer}) | ||
312 | if isopen(x) | ||
313 | stream_wait(x, x.closenotify) | ||
314 | end | ||
315 | nothing | ||
316 | end | ||
317 | |||
318 | function close(stream::Union{LibuvStream, LibuvServer}) | ||
319 | if stream.status == StatusInit | ||
320 | ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream.handle) | ||
321 | elseif isopen(stream) | ||
322 | if stream.status != StatusClosing | ||
323 | ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle) | ||
324 | stream.status = StatusClosing | ||
325 | end | ||
326 | if uv_handle_data(stream) != C_NULL | ||
327 | stream_wait(stream, stream.closenotify) | ||
328 | end | ||
329 | end | ||
330 | nothing | ||
331 | end | ||
332 | |||
333 | function uvfinalize(uv::Union{LibuvStream, LibuvServer}) | ||
334 | if uv.handle != C_NULL | ||
335 | disassociate_julia_struct(uv.handle) # not going to call the usual close hooks | ||
336 | if uv.status != StatusUninit | ||
337 | close(uv) | ||
338 | else | ||
339 | Libc.free(uv.handle) | ||
340 | end | ||
341 | uv.status = StatusClosed | ||
342 | uv.handle = C_NULL | ||
343 | end | ||
344 | nothing | ||
345 | end | ||
346 | |||
347 | if Sys.iswindows() | ||
348 | ispty(s::TTY) = s.ispty | ||
349 | ispty(s::IO) = false | ||
350 | end | ||
351 | |||
352 | """ | ||
353 | displaysize([io::IO]) -> (lines, columns) | ||
354 | |||
355 | Return the nominal size of the screen that may be used for rendering output to | ||
356 | this `IO` object. | ||
357 | If no input is provided, the environment variables `LINES` and `COLUMNS` are read. | ||
358 | If those are not set, a default size of `(24, 80)` is returned. | ||
359 | |||
360 | # Examples | ||
361 | ```jldoctest | ||
362 | julia> withenv("LINES" => 30, "COLUMNS" => 100) do | ||
363 | displaysize() | ||
364 | end | ||
365 | (30, 100) | ||
366 | ``` | ||
367 | |||
368 | To get your TTY size, | ||
369 | |||
370 | ```julia | ||
371 | julia> displaysize(stdout) | ||
372 | (34, 147) | ||
373 | ``` | ||
374 | """ | ||
375 | displaysize(io::IO) = displaysize() | ||
376 | displaysize() = (parse(Int, get(ENV, "LINES", "24")), | ||
377 | parse(Int, get(ENV, "COLUMNS", "80")))::Tuple{Int, Int} | ||
378 | |||
379 | function displaysize(io::TTY) | ||
380 | local h::Int, w::Int | ||
381 | default_size = displaysize() | ||
382 | |||
383 | @static if Sys.iswindows() | ||
384 | if ispty(io) | ||
385 | # io is actually a libuv pipe but a cygwin/msys2 pty | ||
386 | try | ||
387 | h, w = parse.(Int, split(read(open(Base.Cmd(String["stty", "size"]), "r", io).out, String))) | ||
388 | h > 0 || (h = default_size[1]) | ||
389 | w > 0 || (w = default_size[2]) | ||
390 | return h, w | ||
391 | catch | ||
392 | return default_size | ||
393 | end | ||
394 | end | ||
395 | end | ||
396 | |||
397 | s1 = Ref{Int32}(0) | ||
398 | s2 = Ref{Int32}(0) | ||
399 | Base.uv_error("size (TTY)", ccall(:uv_tty_get_winsize, | ||
400 | Int32, (Ptr{Cvoid}, Ptr{Int32}, Ptr{Int32}), | ||
401 | io, s1, s2) != 0) | ||
402 | w, h = s1[], s2[] | ||
403 | h > 0 || (h = default_size[1]) | ||
404 | w > 0 || (w = default_size[2]) | ||
405 | return h, w | ||
406 | end | ||
407 | |||
408 | in(key_value::Pair{Symbol,Bool}, ::TTY) = key_value.first === :color && key_value.second === have_color | ||
409 | haskey(::TTY, key::Symbol) = key === :color | ||
410 | getindex(::TTY, key::Symbol) = key === :color ? have_color : throw(KeyError(key)) | ||
411 | get(::TTY, key::Symbol, default) = key === :color ? have_color : default | ||
412 | |||
413 | ### Libuv callbacks ### | ||
414 | |||
415 | ## BUFFER ## | ||
416 | ## Allocate space in buffer (for immediate use) | ||
417 | function alloc_request(buffer::IOBuffer, recommended_size::UInt) | ||
418 | ensureroom(buffer, Int(recommended_size)) | ||
419 | ptr = buffer.append ? buffer.size + 1 : buffer.ptr | ||
420 | nb = length(buffer.data) - ptr + 1 | ||
421 | return (pointer(buffer.data, ptr), nb) | ||
422 | end | ||
423 | |||
424 | notify_filled(buffer::IOBuffer, nread::Int, base::Ptr{Cvoid}, len::UInt) = notify_filled(buffer, nread) | ||
425 | |||
426 | function notify_filled(buffer::IOBuffer, nread::Int) | ||
427 | if buffer.append | ||
428 | buffer.size += nread | ||
429 | else | ||
430 | buffer.ptr += nread | ||
431 | end | ||
432 | end | ||
433 | |||
434 | function alloc_buf_hook(stream::LibuvStream, size::UInt) | ||
435 | throttle = UInt(stream.throttle) | ||
436 | return alloc_request(stream.buffer, (size > throttle) ? throttle : size) | ||
437 | end | ||
438 | |||
439 | function uv_alloc_buf(handle::Ptr{Cvoid}, size::Csize_t, buf::Ptr{Cvoid}) | ||
440 | hd = uv_handle_data(handle) | ||
441 | if hd == C_NULL | ||
442 | ccall(:jl_uv_buf_set_len, Cvoid, (Ptr{Cvoid}, Csize_t), buf, 0) | ||
443 | return nothing | ||
444 | end | ||
445 | stream = unsafe_pointer_to_objref(hd)::LibuvStream | ||
446 | |||
447 | local data::Ptr{Cvoid}, newsize::Csize_t | ||
448 | if stream.status != StatusActive | ||
449 | data = C_NULL | ||
450 | newsize = 0 | ||
451 | else | ||
452 | (data, newsize) = alloc_buf_hook(stream, UInt(size)) | ||
453 | if data == C_NULL | ||
454 | newsize = 0 | ||
455 | end | ||
456 | # avoid aliasing of `nread` with `errno` in uv_readcb | ||
457 | # or exceeding the Win32 maximum uv_buf_t len | ||
458 | maxsize = @static Sys.iswindows() ? typemax(Cint) : typemax(Cssize_t) | ||
459 | newsize > maxsize && (newsize = maxsize) | ||
460 | end | ||
461 | |||
462 | ccall(:jl_uv_buf_set_base, Cvoid, (Ptr{Cvoid}, Ptr{Cvoid}), buf, data) | ||
463 | ccall(:jl_uv_buf_set_len, Cvoid, (Ptr{Cvoid}, Csize_t), buf, newsize) | ||
464 | nothing | ||
465 | end | ||
466 | |||
467 | function uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid}) | ||
468 | stream_unknown_type = @handle_as handle LibuvStream | ||
469 | nrequested = ccall(:jl_uv_buf_len, Csize_t, (Ptr{Cvoid},), buf) | ||
470 | function readcb_specialized(stream::LibuvStream, nread::Int, nrequested::UInt) | ||
471 | if nread < 0 | ||
472 | if nread == UV_ENOBUFS && nrequested == 0 | ||
473 | # remind the client that stream.buffer is full | ||
474 | notify(stream.readnotify) | ||
475 | elseif nread == UV_EOF | ||
476 | if isa(stream, TTY) | ||
477 | stream.status = StatusEOF # libuv called uv_stop_reading already | ||
478 | notify(stream.readnotify) | ||
479 | notify(stream.closenotify) | ||
480 | elseif stream.status != StatusClosing | ||
481 | # begin shutdown of the stream | ||
482 | ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle) | ||
483 | stream.status = StatusClosing | ||
484 | end | ||
485 | else | ||
486 | # This is a fatal connection error. Shutdown requests as per the usual | ||
487 | # close function won't work and libuv will fail with an assertion failure | ||
488 | ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream) | ||
489 | notify_error(stream.readnotify, _UVError("read", nread)) | ||
490 | end | ||
491 | else | ||
492 | notify_filled(stream.buffer, nread) | ||
493 | notify(stream.readnotify) | ||
494 | end | ||
495 | |||
496 | # Stop background reading when | ||
497 | # 1) there's nobody paying attention to the data we are reading | ||
498 | # 2) we have accumulated a lot of unread data OR | ||
499 | # 3) we have an alternate buffer that has reached its limit. | ||
500 | if stream.status == StatusPaused || | ||
501 | (stream.status == StatusActive && | ||
502 | ((bytesavailable(stream.buffer) >= stream.throttle) || | ||
503 | (bytesavailable(stream.buffer) >= stream.buffer.maxsize))) | ||
504 | # save cycles by stopping kernel notifications from arriving | ||
505 | ccall(:uv_read_stop, Cint, (Ptr{Cvoid},), stream) | ||
506 | stream.status = StatusOpen | ||
507 | end | ||
508 | nothing | ||
509 | end | ||
510 | readcb_specialized(stream_unknown_type, Int(nread), UInt(nrequested)) | ||
511 | end | ||
512 | |||
513 | function reseteof(x::TTY) | ||
514 | if x.status == StatusEOF | ||
515 | x.status = StatusOpen | ||
516 | end | ||
517 | nothing | ||
518 | end | ||
519 | |||
520 | function _uv_hook_close(uv::Union{LibuvStream, LibuvServer}) | ||
521 | uv.handle = C_NULL | ||
522 | uv.status = StatusClosed | ||
523 | # notify any listeners that exist on this libuv stream type | ||
524 | notify(uv.closenotify) | ||
525 | isdefined(uv, :readnotify) && notify(uv.readnotify) | ||
526 | isdefined(uv, :connectnotify) && notify(uv.connectnotify) | ||
527 | nothing | ||
528 | end | ||
529 | |||
530 | |||
531 | ########################################## | ||
532 | # Pipe Abstraction | ||
533 | # (composed of two half-pipes: .in and .out) | ||
534 | ########################################## | ||
535 | |||
536 | mutable struct Pipe <: AbstractPipe | ||
537 | in::PipeEndpoint # writable | ||
538 | out::PipeEndpoint # readable | ||
539 | end | ||
540 | |||
541 | """ | ||
542 | Construct an uninitialized Pipe object. | ||
543 | |||
544 | The appropriate end of the pipe will be automatically initialized if | ||
545 | the object is used in process spawning. This can be useful to easily | ||
546 | obtain references in process pipelines, e.g.: | ||
547 | |||
548 | ``` | ||
549 | julia> err = Pipe() | ||
550 | |||
551 | # After this `err` will be initialized and you may read `foo`'s | ||
552 | # stderr from the `err` pipe. | ||
553 | julia> run(pipeline(pipeline(`foo`, stderr=err), `cat`), wait=false) | ||
554 | ``` | ||
555 | """ | ||
556 | Pipe() = Pipe(PipeEndpoint(), PipeEndpoint()) | ||
557 | pipe_reader(p::Pipe) = p.out | ||
558 | pipe_writer(p::Pipe) = p.in | ||
559 | |||
560 | function link_pipe!(pipe::Pipe; | ||
561 | reader_supports_async = false, | ||
562 | writer_supports_async = false) | ||
563 | link_pipe!(pipe.out, reader_supports_async, pipe.in, writer_supports_async) | ||
564 | return pipe | ||
565 | end | ||
566 | |||
567 | show(io::IO, stream::Pipe) = print(io, | ||
568 | "Pipe(", | ||
569 | _fd(stream.in), " ", | ||
570 | uv_status_string(stream.in), " => ", | ||
571 | _fd(stream.out), " ", | ||
572 | uv_status_string(stream.out), ", ", | ||
573 | bytesavailable(stream), " bytes waiting)") | ||
574 | |||
575 | |||
576 | ## Functions for PipeEndpoint and PipeServer ## | ||
577 | |||
578 | function open_pipe!(p::PipeEndpoint, handle::OS_HANDLE, readable::Bool, writable::Bool) | ||
579 | if p.status != StatusInit | ||
580 | error("pipe is already in use or has been closed") | ||
581 | end | ||
582 | err = ccall(:jl_pipe_open, Int32, (Ptr{Cvoid}, OS_HANDLE, Cint, Cint), p.handle, handle, readable, writable) | ||
583 | uv_error("open_pipe", err) | ||
584 | p.status = StatusOpen | ||
585 | return p | ||
586 | end | ||
587 | |||
588 | |||
589 | function link_pipe!(read_end::PipeEndpoint, reader_supports_async::Bool, | ||
590 | write_end::PipeEndpoint, writer_supports_async::Bool) | ||
591 | rd, wr = link_pipe(reader_supports_async, writer_supports_async) | ||
592 | try | ||
593 | try | ||
594 | open_pipe!(read_end, rd, true, false) | ||
595 | catch e | ||
596 | close_pipe_sync(rd) | ||
597 | rethrow(e) | ||
598 | end | ||
599 | read_end.status = StatusOpen | ||
600 | open_pipe!(write_end, wr, false, true) | ||
601 | catch e | ||
602 | close_pipe_sync(wr) | ||
603 | rethrow(e) | ||
604 | end | ||
605 | write_end.status = StatusOpen | ||
606 | nothing | ||
607 | end | ||
608 | |||
609 | function link_pipe(reader_supports_async::Bool, writer_supports_async::Bool) | ||
610 | UV_NONBLOCK_PIPE = 0x40 | ||
611 | fildes = Ref{Pair{OS_HANDLE, OS_HANDLE}}(INVALID_OS_HANDLE => INVALID_OS_HANDLE) # read (in) => write (out) | ||
612 | err = ccall(:uv_pipe, Int32, (Ptr{Pair{OS_HANDLE, OS_HANDLE}}, Cint, Cint), | ||
613 | fildes, | ||
614 | reader_supports_async * UV_NONBLOCK_PIPE, | ||
615 | writer_supports_async * UV_NONBLOCK_PIPE) | ||
616 | uv_error("pipe", err) | ||
617 | return fildes[] | ||
618 | end | ||
619 | |||
620 | if Sys.iswindows() | ||
621 | function close_pipe_sync(handle::WindowsRawSocket) | ||
622 | ccall(:CloseHandle, stdcall, Cint, (WindowsRawSocket,), handle) | ||
623 | nothing | ||
624 | end | ||
625 | else | ||
626 | function close_pipe_sync(handle::RawFD) | ||
627 | ccall(:close, Cint, (RawFD,), handle) | ||
628 | nothing | ||
629 | end | ||
630 | end | ||
631 | |||
632 | ## Functions for any LibuvStream ## | ||
633 | |||
634 | # flow control | ||
635 | |||
636 | function start_reading(stream::LibuvStream) | ||
637 | if stream.status == StatusOpen | ||
638 | if !isreadable(stream) | ||
639 | error("tried to read a stream that is not readable") | ||
640 | end | ||
641 | # libuv may call the alloc callback immediately | ||
642 | # for a TTY on Windows, so ensure the status is set first | ||
643 | stream.status = StatusActive | ||
644 | ret = ccall(:uv_read_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}), | ||
645 | stream, uv_jl_alloc_buf::Ptr{Cvoid}, uv_jl_readcb::Ptr{Cvoid}) | ||
646 | return ret | ||
647 | elseif stream.status == StatusPaused | ||
648 | stream.status = StatusActive | ||
649 | return Int32(0) | ||
650 | elseif stream.status == StatusActive | ||
651 | return Int32(0) | ||
652 | else | ||
653 | return Int32(-1) | ||
654 | end | ||
655 | end | ||
656 | |||
657 | if Sys.iswindows() | ||
658 | # the low performance version of stop_reading is required | ||
659 | # on Windows due to a NT kernel bug that we can't use a blocking | ||
660 | # stream for non-blocking (overlapped) calls, | ||
661 | # and a ReadFile call blocking on one thread | ||
662 | # causes all other operations on that stream to lockup | ||
663 | function stop_reading(stream::LibuvStream) | ||
664 | if stream.status == StatusActive | ||
665 | stream.status = StatusOpen | ||
666 | ccall(:uv_read_stop, Cint, (Ptr{Cvoid},), stream) | ||
667 | end | ||
668 | nothing | ||
669 | end | ||
670 | else | ||
671 | function stop_reading(stream::LibuvStream) | ||
672 | if stream.status == StatusActive | ||
673 | stream.status = StatusPaused | ||
674 | end | ||
675 | nothing | ||
676 | end | ||
677 | end | ||
678 | |||
679 | # bulk read / write | ||
680 | |||
681 | readbytes!(s::LibuvStream, a::Vector{UInt8}, nb = length(a)) = readbytes!(s, a, Int(nb)) | ||
682 | function readbytes!(s::LibuvStream, a::Vector{UInt8}, nb::Int) | ||
683 | sbuf = s.buffer | ||
684 | @assert sbuf.seekable == false | ||
685 | @assert sbuf.maxsize >= nb | ||
686 | |||
687 | if bytesavailable(sbuf) >= nb | ||
688 | return readbytes!(sbuf, a, nb) | ||
689 | end | ||
690 | |||
691 | if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer | ||
692 | wait_readnb(s, nb) | ||
693 | return readbytes!(sbuf, a, nb) | ||
694 | else | ||
695 | try | ||
696 | stop_reading(s) # Just playing it safe, since we are going to switch buffers. | ||
697 | newbuf = PipeBuffer(a, maxsize = nb) | ||
698 | newbuf.size = 0 # reset the write pointer to the beginning | ||
699 | s.buffer = newbuf | ||
700 | write(newbuf, sbuf) | ||
701 | wait_readnb(s, Int(nb)) | ||
702 | compact(newbuf) | ||
703 | return bytesavailable(newbuf) | ||
704 | finally | ||
705 | s.buffer = sbuf | ||
706 | if !isempty(s.readnotify.waitq) | ||
707 | start_reading(s) # resume reading iff there are currently other read clients of the stream | ||
708 | end | ||
709 | end | ||
710 | end | ||
711 | @assert false # unreachable | ||
712 | end | ||
713 | |||
714 | function read(stream::LibuvStream) | ||
715 | wait_readnb(stream, typemax(Int)) | ||
716 | return take!(stream.buffer) | ||
717 | end | ||
718 | |||
719 | function unsafe_read(s::LibuvStream, p::Ptr{UInt8}, nb::UInt) | ||
720 | sbuf = s.buffer | ||
721 | @assert sbuf.seekable == false | ||
722 | @assert sbuf.maxsize >= nb | ||
723 | |||
724 | if bytesavailable(sbuf) >= nb | ||
725 | return unsafe_read(sbuf, p, nb) | ||
726 | end | ||
727 | |||
728 | if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer | ||
729 | wait_readnb(s, Int(nb)) | ||
730 | unsafe_read(sbuf, p, nb) | ||
731 | else | ||
732 | try | ||
733 | stop_reading(s) # Just playing it safe, since we are going to switch buffers. | ||
734 | newbuf = PipeBuffer(unsafe_wrap(Array, p, nb), maxsize = Int(nb)) | ||
735 | newbuf.size = 0 # reset the write pointer to the beginning | ||
736 | s.buffer = newbuf | ||
737 | write(newbuf, sbuf) | ||
738 | wait_readnb(s, Int(nb)) | ||
739 | nb == bytesavailable(newbuf) || throw(EOFError()) | ||
740 | finally | ||
741 | s.buffer = sbuf | ||
742 | if !isempty(s.readnotify.waitq) | ||
743 | start_reading(s) # resume reading iff there are currently other read clients of the stream | ||
744 | end | ||
745 | end | ||
746 | end | ||
747 | nothing | ||
748 | end | ||
749 | |||
750 | function read(this::LibuvStream, ::Type{UInt8}) | ||
751 | wait_readnb(this, 1) | ||
752 | buf = this.buffer | ||
753 | @assert buf.seekable == false | ||
754 | return read(buf, UInt8) | ||
755 | end | ||
756 | |||
757 | function readavailable(this::LibuvStream) | ||
758 | wait_readnb(this, 1) | ||
759 | buf = this.buffer | ||
760 | @assert buf.seekable == false | ||
761 | return take!(buf) | ||
762 | end | ||
763 | |||
764 | function readuntil(this::LibuvStream, c::UInt8; keep::Bool=false) | ||
765 | wait_readbyte(this, c) | ||
766 | buf = this.buffer | ||
767 | @assert buf.seekable == false | ||
768 | return readuntil(buf, c, keep=keep) | ||
769 | end | ||
770 | |||
771 | uv_write(s::LibuvStream, p::Vector{UInt8}) = uv_write(s, pointer(p), UInt(sizeof(p))) | ||
772 | |||
773 |
3 (7.14%) samples spent in uv_write
function uv_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt)
0 (ex.), 3 (100.00%) (incl.) when called from unsafe_write line 830 |
||
774 | uvw = uv_write_async(s, p, n) | ||
775 | ct = current_task() | ||
776 | preserve_handle(ct) | ||
777 | try | ||
778 | # wait for the last chunk to complete (or error) | ||
779 | # assume that any errors would be sticky, | ||
780 | # (so we don't need to monitor the error status of the intermediate writes) | ||
781 | uv_req_set_data(uvw, ct) | ||
782 | 3 (7.14%) |
3 (100.00%)
samples spent calling
wait
wait()
|
|
783 | finally | ||
784 | if uv_req_data(uvw) != C_NULL | ||
785 | # uvw is still alive, | ||
786 | # so make sure we won't get spurious notifications later | ||
787 | uv_req_set_data(uvw, C_NULL) | ||
788 | else | ||
789 | # done with uvw | ||
790 | Libc.free(uvw) | ||
791 | end | ||
792 | unpreserve_handle(ct) | ||
793 | end | ||
794 | return Int(n) | ||
795 | end | ||
796 | |||
797 | # helper function for uv_write that returns the uv_write_t struct for the write | ||
798 | # rather than waiting on it | ||
799 | function uv_write_async(s::LibuvStream, p::Ptr{UInt8}, n::UInt) | ||
800 | check_open(s) | ||
801 | while true | ||
802 | uvw = Libc.malloc(_sizeof_uv_write) | ||
803 | uv_req_set_data(uvw, C_NULL) # in case we get interrupted before arriving at the wait call | ||
804 | nwrite = min(n, MAX_OS_WRITE) # split up the write into chunks the OS can handle. | ||
805 | # TODO: use writev, when that is added to uv-win | ||
806 | err = ccall(:jl_uv_write, | ||
807 | Int32, | ||
808 | (Ptr{Cvoid}, Ptr{Cvoid}, UInt, Ptr{Cvoid}, Ptr{Cvoid}), | ||
809 | s, p, nwrite, uvw, | ||
810 | uv_jl_writecb_task::Ptr{Cvoid}) | ||
811 | if err < 0 | ||
812 | Libc.free(uvw) | ||
813 | uv_error("write", err) | ||
814 | end | ||
815 | n -= nwrite | ||
816 | p += nwrite | ||
817 | if n == 0 | ||
818 | return uvw | ||
819 | end | ||
820 | end | ||
821 | end | ||
822 | |||
823 | |||
824 | # Optimized send | ||
825 | # - smaller writes are buffered, final uv write on flush or when buffer full | ||
826 | # - large isbits arrays are unbuffered and written directly | ||
827 | |||
828 | function unsafe_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt) | ||
829 | if s.sendbuf === nothing | ||
830 | 3 (7.14%) |
3 (100.00%)
samples spent calling
uv_write
return uv_write(s, p, UInt(n))
|
|
831 | end | ||
832 | |||
833 | buf = s.sendbuf | ||
834 | totb = bytesavailable(buf) + n | ||
835 | if totb < buf.maxsize | ||
836 | nb = unsafe_write(buf, p, n) | ||
837 | else | ||
838 | flush(s) | ||
839 | if n > buf.maxsize | ||
840 | nb = uv_write(s, p, n) | ||
841 | else | ||
842 | nb = unsafe_write(buf, p, n) | ||
843 | end | ||
844 | end | ||
845 | return nb | ||
846 | end | ||
847 | |||
848 | function flush(s::LibuvStream) | ||
849 | buf = s.sendbuf | ||
850 | if buf !== nothing | ||
851 | if bytesavailable(buf) > 0 | ||
852 | arr = take!(buf) # Array of UInt8s | ||
853 | uv_write(s, arr) | ||
854 | return | ||
855 | end | ||
856 | end | ||
857 | uv_write(s, Ptr{UInt8}(Base.eventloop()), UInt(0)) # zero write from a random pointer to flush current queue | ||
858 | return | ||
859 | end | ||
860 | |||
861 | buffer_writes(s::LibuvStream, bufsize) = (s.sendbuf=PipeBuffer(bufsize); s) | ||
862 | |||
863 | ## low-level calls to libuv ## | ||
864 | |||
865 | function write(s::LibuvStream, b::UInt8) | ||
866 | buf = s.sendbuf | ||
867 | if buf !== nothing | ||
868 | if bytesavailable(buf) + 1 < buf.maxsize | ||
869 | return write(buf, b) | ||
870 | end | ||
871 | end | ||
872 | return write(s, Ref{UInt8}(b)) | ||
873 | end | ||
874 | |||
875 | function uv_writecb_task(req::Ptr{Cvoid}, status::Cint) | ||
876 | d = uv_req_data(req) | ||
877 | if d != C_NULL | ||
878 | uv_req_set_data(req, C_NULL) # let the Task know we got the writecb | ||
879 | t = unsafe_pointer_to_objref(d)::Task | ||
880 | if status < 0 | ||
881 | err = _UVError("write", status) | ||
882 | schedule(t, err, error=true) | ||
883 | else | ||
884 | schedule(t) | ||
885 | end | ||
886 | else | ||
887 | # no owner for this req, safe to just free it | ||
888 | Libc.free(req) | ||
889 | end | ||
890 | nothing | ||
891 | end | ||
892 | |||
893 | _fd(x::IOStream) = RawFD(fd(x)) | ||
894 | |||
895 | function _fd(x::Union{LibuvStream, LibuvServer}) | ||
896 | fd = Ref{OS_HANDLE}(INVALID_OS_HANDLE) | ||
897 | if x.status != StatusUninit && x.status != StatusClosed | ||
898 | err = ccall(:uv_fileno, Int32, (Ptr{Cvoid}, Ptr{OS_HANDLE}), x.handle, fd) | ||
899 | # handle errors by returning INVALID_OS_HANDLE | ||
900 | end | ||
901 | return fd[] | ||
902 | end | ||
903 | |||
904 | for (x, writable, unix_fd, c_symbol) in | ||
905 | ((:stdin, false, 0, :jl_uv_stdin), | ||
906 | (:stdout, true, 1, :jl_uv_stdout), | ||
907 | (:stderr, true, 2, :jl_uv_stderr)) | ||
908 | f = Symbol("redirect_", lowercase(string(x))) | ||
909 | _f = Symbol("_", f) | ||
910 | @eval begin | ||
911 | function ($_f)(stream) | ||
912 | global $x | ||
913 | posix_fd = _fd(stream) | ||
914 | @static if Sys.iswindows() | ||
915 | ccall(:SetStdHandle, stdcall, Int32, (Int32, OS_HANDLE), | ||
916 | $(-10 - unix_fd), Libc._get_osfhandle(posix_fd)) | ||
917 | end | ||
918 | dup(posix_fd, RawFD($unix_fd)) | ||
919 | $x = stream | ||
920 | nothing | ||
921 | end | ||
922 | function ($f)(handle::Union{LibuvStream, IOStream}) | ||
923 | $(_f)(handle) | ||
924 | unsafe_store!(cglobal($(Expr(:quote, c_symbol)), Ptr{Cvoid}), | ||
925 | handle.handle) | ||
926 | return handle | ||
927 | end | ||
928 | function ($f)() | ||
929 | p = link_pipe!(Pipe()) | ||
930 | read, write = p.out, p.in | ||
931 | ($f)($(writable ? :write : :read)) | ||
932 | return (read, write) | ||
933 | end | ||
934 | end | ||
935 | end | ||
936 | |||
937 | """ | ||
938 | redirect_stdout([stream]) -> (rd, wr) | ||
939 | |||
940 | Create a pipe to which all C and Julia level [`stdout`](@ref) output | ||
941 | will be redirected. | ||
942 | Returns a tuple `(rd, wr)` representing the pipe ends. | ||
943 | Data written to [`stdout`](@ref) may now be read from the `rd` end of | ||
944 | the pipe. The `wr` end is given for convenience in case the old | ||
945 | [`stdout`](@ref) object was cached by the user and needs to be replaced | ||
946 | elsewhere. | ||
947 | |||
948 | If called with the optional `stream` argument, then returns `stream` itself. | ||
949 | |||
950 | !!! note | ||
951 | `stream` must be a `TTY`, a `Pipe`, or a socket. | ||
952 | """ | ||
953 | redirect_stdout | ||
954 | |||
955 | """ | ||
956 | redirect_stderr([stream]) -> (rd, wr) | ||
957 | |||
958 | Like [`redirect_stdout`](@ref), but for [`stderr`](@ref). | ||
959 | |||
960 | !!! note | ||
961 | `stream` must be a `TTY`, a `Pipe`, or a socket. | ||
962 | """ | ||
963 | redirect_stderr | ||
964 | |||
965 | """ | ||
966 | redirect_stdin([stream]) -> (rd, wr) | ||
967 | |||
968 | Like [`redirect_stdout`](@ref), but for [`stdin`](@ref). | ||
969 | Note that the order of the return tuple is still `(rd, wr)`, | ||
970 | i.e. data to be read from [`stdin`](@ref) may be written to `wr`. | ||
971 | |||
972 | !!! note | ||
973 | `stream` must be a `TTY`, a `Pipe`, or a socket. | ||
974 | """ | ||
975 | redirect_stdin | ||
976 | |||
977 | for (F,S) in ((:redirect_stdin, :stdin), (:redirect_stdout, :stdout), (:redirect_stderr, :stderr)) | ||
978 | @eval function $F(f::Function, stream) | ||
979 | STDOLD = $S | ||
980 | local ret | ||
981 | $F(stream) | ||
982 | try | ||
983 | ret = f() | ||
984 | finally | ||
985 | $F(STDOLD) | ||
986 | end | ||
987 | ret | ||
988 | end | ||
989 | end | ||
990 | |||
991 | """ | ||
992 | redirect_stdout(f::Function, stream) | ||
993 | |||
994 | Run the function `f` while redirecting [`stdout`](@ref) to `stream`. | ||
995 | Upon completion, [`stdout`](@ref) is restored to its prior setting. | ||
996 | |||
997 | !!! note | ||
998 | `stream` must be a `TTY`, a `Pipe`, or a socket. | ||
999 | """ | ||
1000 | redirect_stdout(f::Function, stream) | ||
1001 | |||
1002 | """ | ||
1003 | redirect_stderr(f::Function, stream) | ||
1004 | |||
1005 | Run the function `f` while redirecting [`stderr`](@ref) to `stream`. | ||
1006 | Upon completion, [`stderr`](@ref) is restored to its prior setting. | ||
1007 | |||
1008 | !!! note | ||
1009 | `stream` must be a `TTY`, a `Pipe`, or a socket. | ||
1010 | """ | ||
1011 | redirect_stderr(f::Function, stream) | ||
1012 | |||
1013 | """ | ||
1014 | redirect_stdin(f::Function, stream) | ||
1015 | |||
1016 | Run the function `f` while redirecting [`stdin`](@ref) to `stream`. | ||
1017 | Upon completion, [`stdin`](@ref) is restored to its prior setting. | ||
1018 | |||
1019 | !!! note | ||
1020 | `stream` must be a `TTY`, a `Pipe`, or a socket. | ||
1021 | """ | ||
1022 | redirect_stdin(f::Function, stream) | ||
1023 | |||
1024 | mark(x::LibuvStream) = mark(x.buffer) | ||
1025 | unmark(x::LibuvStream) = unmark(x.buffer) | ||
1026 | reset(x::LibuvStream) = reset(x.buffer) | ||
1027 | ismarked(x::LibuvStream) = ismarked(x.buffer) | ||
1028 | |||
1029 | function peek(s::LibuvStream) | ||
1030 | mark(s) | ||
1031 | try read(s, UInt8) | ||
1032 | finally | ||
1033 | reset(s) | ||
1034 | end | ||
1035 | end | ||
1036 | |||
1037 | # BufferStream's are non-OS streams, backed by a regular IOBuffer | ||
1038 | mutable struct BufferStream <: LibuvStream | ||
1039 | buffer::IOBuffer | ||
1040 | r_c::Condition | ||
1041 | close_c::Condition | ||
1042 | is_open::Bool | ||
1043 | buffer_writes::Bool | ||
1044 | lock::ReentrantLock | ||
1045 | |||
1046 | BufferStream() = new(PipeBuffer(), Condition(), Condition(), true, false, ReentrantLock()) | ||
1047 | end | ||
1048 | |||
1049 | isopen(s::BufferStream) = s.is_open | ||
1050 | function close(s::BufferStream) | ||
1051 | s.is_open = false | ||
1052 | notify(s.r_c) | ||
1053 | notify(s.close_c) | ||
1054 | nothing | ||
1055 | end | ||
1056 | uvfinalize(s::BufferStream) = nothing | ||
1057 | |||
1058 | read(s::BufferStream, ::Type{UInt8}) = (wait_readnb(s, 1); read(s.buffer, UInt8)) | ||
1059 | unsafe_read(s::BufferStream, a::Ptr{UInt8}, nb::UInt) = (wait_readnb(s, Int(nb)); unsafe_read(s.buffer, a, nb)) | ||
1060 | bytesavailable(s::BufferStream) = bytesavailable(s.buffer) | ||
1061 | |||
1062 | isreadable(s::BufferStream) = s.buffer.readable | ||
1063 | iswritable(s::BufferStream) = s.buffer.writable | ||
1064 | |||
1065 | function wait_readnb(s::BufferStream, nb::Int) | ||
1066 | while isopen(s) && bytesavailable(s.buffer) < nb | ||
1067 | wait(s.r_c) | ||
1068 | end | ||
1069 | end | ||
1070 | |||
1071 | show(io::IO, s::BufferStream) = print(io,"BufferStream() bytes waiting:",bytesavailable(s.buffer),", isopen:", s.is_open) | ||
1072 | |||
1073 | function wait_readbyte(s::BufferStream, c::UInt8) | ||
1074 | while isopen(s) && !occursin(c, s.buffer) | ||
1075 | wait(s.r_c) | ||
1076 | end | ||
1077 | end | ||
1078 | |||
1079 | wait_close(s::BufferStream) = if isopen(s); wait(s.close_c); end | ||
1080 | start_reading(s::BufferStream) = Int32(0) | ||
1081 | stop_reading(s::BufferStream) = nothing | ||
1082 | |||
1083 | write(s::BufferStream, b::UInt8) = write(s, Ref{UInt8}(b)) | ||
1084 | function unsafe_write(s::BufferStream, p::Ptr{UInt8}, nb::UInt) | ||
1085 | rv = unsafe_write(s.buffer, p, nb) | ||
1086 | !(s.buffer_writes) && notify(s.r_c) | ||
1087 | return rv | ||
1088 | end | ||
1089 | |||
1090 | function eof(s::BufferStream) | ||
1091 | wait_readnb(s, 1) | ||
1092 | return !isopen(s) && bytesavailable(s)<=0 | ||
1093 | end | ||
1094 | |||
1095 | # If buffer_writes is called, it will delay notifying waiters till a flush is called. | ||
1096 | buffer_writes(s::BufferStream, bufsize=0) = (s.buffer_writes=true; s) | ||
1097 | flush(s::BufferStream) = (notify(s.r_c); nothing) |