How to read from socket without blocking

2019-03-01 09:56发布

问题:

I ran into a problem when dealing with the server, sending me initial "greetings header" (smtp server):

I need to read this header before send any commands and receive any answers from the server, but i dont know, how to do this, because Julia seems to lack any possibility to readi from IO stream without blocking: "read" command and its analogues does not have any NB-options, nb_available always is 0 though i know exactly that server send me header and my read buffer cant be empty (and "read" command issued right after "nb_available" give me data immediately, without blocking).

julia> s=connect("smtp.mail.ru",25)
TCPSocket(RawFD(18) open, 0 bytes waiting)

julia> nb_available(s)
0

julia> nb_available(s)
0
(after 5 seconds or so...)
julia> nb_available(s)
0

julia> t=read(s,10)
10-element Array{UInt8,1}:
 0x32
 0x32
 0x30
 0x20
 0x73
 0x6d
 0x74
 0x70
 0x31
 0x34
(HOW, WHY???? nb_available==0, but read returns me 10 bytes?!)

... (read was repeated many times...)
julia> t=read(s,10)
^CERROR: InterruptException:
Stacktrace:
 [1] process_events at ./libuv.jl:82 [inlined]
 [2] wait() at ./event.jl:216
 [3] wait(::Condition) at ./event.jl:27
 [4] wait_readnb(::TCPSocket, ::Int64) at ./stream.jl:296
 [5] readbytes!(::TCPSocket, ::Array{UInt8,1}, ::Int64) at ./stream.jl:714
 [6] read(::TCPSocket, ::Int64) at ./io.jl:529

I dont want to use @async for the simplest case described above.

Who knows, how to read from TCP socket in non-blocking mode, when i can determine some way, whether read buffer contain any data or no and/or whether next read issued by tcp client will block overall client process or no.

Is it possible in Julia without "green threads" usage?

回答1:

Since no-one's provided an "official" solution yet, here's the workaround I mentioned above.

Functions:

# Causes stagnant 'nb' count to be updated.
# Note asynchronous nature; this means refresh may not yet have occurred 
# when function has exited. 
function refreshBufsize(s)
  @async eof(s);
  return nothing;
end;

# Check if socket is blocked (refresh bytecount first)
# Note, since refresh is asynchronous, may misreport 'blockage' until
# 'refresh' operation is actually finished; however, if socket is actually
# unblocked, subsequent calls of this function will eventually properly
# report socket is not blocked, and in general, misreporting blockage once 
# or twice when socket is actually free is probably acceptable (rather 
# than other way round).
function isblocked(s)
  refreshBufsize(s)
  return nb_available(s) == 0;
end;

# Peek contents of socket without consuming stream
function peek(s, nb)
  refreshBufsize(s)
  s.buffer.seekable = true;
  Out = read(s.buffer, nb);
  seekstart(s.buffer);
  s.buffer.seekable = false
  return Out
end;

Example: (console outputs denoted as "#>" comments, for copy-pastable code)

server = listen(9001); 
sOut   = connect(9001); 
sIn    = accept(server);

nb_available(sIn)       
#> 0
isblocked(sIn)          
#> true
refreshBufsize(sIn); # we expect no change, as we haven't written anything yet
isblocked(sIn)          
#> true
write(sOut, "Greetinks and salutations!\n")  
#> 27
write(sOut, "We would be honoured if you would join us.\n") 
#> 43
refreshBufsize(sIn);
isblocked(sIn) # note: may say true at first (until refresh properly finished)
#> false
nb_available(sIn) 
#> 27
String( peek( sIn, 10)) # peek socket contents without consuming
#> "Greetinks "
String( read( sIn, nb_available( sIn))) # read (consume) as normal
#> "Greetinks and salutations!\n"
nb_available(sIn) # note 0 even though second buffer awaiting. needs refresh!
#> 0
isblocked(sIn) # note: called "refresh" under the hood 
               # (but keep async in mind, i.e. might say 'true' at first!)
#> false
nb_available(sIn)
#> 43
String( read( sIn, nb_available( sIn)))
#> "We would be honoured if you would join us.\n"
isblocked(sIn)
#> true

EDIT: for comparison, a more typical "asynchronous" socket session (which typically relies on such "blocking" behaviour) would probably look something like this:

server = listen(9002);
sOut   = connect(9002);
sIn    = accept(server);

TaskRef = @async try 
  while true
    In = String(readavailable(sIn));
    if !isempty(In); println("Received from server: $In"); else; break; end
  end
  println("Connection closed normally");
catch E
  println("Connection closed (with status $E)");
end;

write(sOut, "Stop repeating everything I say!\n");
#> Received from server: Stop repeating everything I say!

close(sIn)
#> Connection closed normally