I ran into a problem when dealing with the server, sending me initial "greetings header" (smtp server):
I need to read this header before send any commands and receive any answers from the server, but i dont know, how to do this, because Julia seems to lack any possibility to readi from IO stream without blocking: "read" command and its analogues does not have any NB-options, nb_available always is 0 though i know exactly that server send me header and my read buffer cant be empty (and "read" command issued right after "nb_available" give me data immediately, without blocking).
julia> s=connect("smtp.mail.ru",25)
TCPSocket(RawFD(18) open, 0 bytes waiting)
julia> nb_available(s)
0
julia> nb_available(s)
0
(after 5 seconds or so...)
julia> nb_available(s)
0
julia> t=read(s,10)
10-element Array{UInt8,1}:
0x32
0x32
0x30
0x20
0x73
0x6d
0x74
0x70
0x31
0x34
(HOW, WHY???? nb_available==0, but read returns me 10 bytes?!)
... (read was repeated many times...)
julia> t=read(s,10)
^CERROR: InterruptException:
Stacktrace:
[1] process_events at ./libuv.jl:82 [inlined]
[2] wait() at ./event.jl:216
[3] wait(::Condition) at ./event.jl:27
[4] wait_readnb(::TCPSocket, ::Int64) at ./stream.jl:296
[5] readbytes!(::TCPSocket, ::Array{UInt8,1}, ::Int64) at ./stream.jl:714
[6] read(::TCPSocket, ::Int64) at ./io.jl:529
I dont want to use @async for the simplest case described above.
Who knows, how to read from TCP socket in non-blocking mode, when i can determine some way, whether read buffer contain any data or no and/or whether next read issued by tcp client will block overall client process or no.
Is it possible in Julia without "green threads" usage?
Since no-one's provided an "official" solution yet, here's the workaround I mentioned above.
Functions:
# Causes stagnant 'nb' count to be updated.
# Note asynchronous nature; this means refresh may not yet have occurred
# when function has exited.
function refreshBufsize(s)
@async eof(s);
return nothing;
end;
# Check if socket is blocked (refresh bytecount first)
# Note, since refresh is asynchronous, may misreport 'blockage' until
# 'refresh' operation is actually finished; however, if socket is actually
# unblocked, subsequent calls of this function will eventually properly
# report socket is not blocked, and in general, misreporting blockage once
# or twice when socket is actually free is probably acceptable (rather
# than other way round).
function isblocked(s)
refreshBufsize(s)
return nb_available(s) == 0;
end;
# Peek contents of socket without consuming stream
function peek(s, nb)
refreshBufsize(s)
s.buffer.seekable = true;
Out = read(s.buffer, nb);
seekstart(s.buffer);
s.buffer.seekable = false
return Out
end;
Example: (console outputs denoted as "#>" comments, for copy-pastable code)
server = listen(9001);
sOut = connect(9001);
sIn = accept(server);
nb_available(sIn)
#> 0
isblocked(sIn)
#> true
refreshBufsize(sIn); # we expect no change, as we haven't written anything yet
isblocked(sIn)
#> true
write(sOut, "Greetinks and salutations!\n")
#> 27
write(sOut, "We would be honoured if you would join us.\n")
#> 43
refreshBufsize(sIn);
isblocked(sIn) # note: may say true at first (until refresh properly finished)
#> false
nb_available(sIn)
#> 27
String( peek( sIn, 10)) # peek socket contents without consuming
#> "Greetinks "
String( read( sIn, nb_available( sIn))) # read (consume) as normal
#> "Greetinks and salutations!\n"
nb_available(sIn) # note 0 even though second buffer awaiting. needs refresh!
#> 0
isblocked(sIn) # note: called "refresh" under the hood
# (but keep async in mind, i.e. might say 'true' at first!)
#> false
nb_available(sIn)
#> 43
String( read( sIn, nb_available( sIn)))
#> "We would be honoured if you would join us.\n"
isblocked(sIn)
#> true
EDIT: for comparison, a more typical "asynchronous" socket session (which typically relies on such "blocking" behaviour) would probably look something like this:
server = listen(9002);
sOut = connect(9002);
sIn = accept(server);
TaskRef = @async try
while true
In = String(readavailable(sIn));
if !isempty(In); println("Received from server: $In"); else; break; end
end
println("Connection closed normally");
catch E
println("Connection closed (with status $E)");
end;
write(sOut, "Stop repeating everything I say!\n");
#> Received from server: Stop repeating everything I say!
close(sIn)
#> Connection closed normally