Send data to multiple sockets using pipes, tee() a

2019-03-10 02:59发布

I'm duplicating a "master" pipe with tee() to write to multiple sockets using splice(). Naturally these pipes will get emptied at different rates depending on how much I can splice() to the destination sockets. So when I next go to add data to the "master" pipe and then tee() it again, I may have a situation where I can write 64KB to the pipe but only tee 4KB to one of the "slave" pipes. I'm guessing then that if I splice() all of the "master" pipe to the socket, I will never be able to tee() the remaining 60KB to that slave pipe. Is that true? I guess I can keep track of a tee_offset (starting at 0) which I set to the start of the "unteed" data and then don't splice() past it. So in this case I would set tee_offset to 4096 and not splice more than that until I'm able to tee it to all to the other pipes. Am I on the right track here? Any tips/warnings for me?

1条回答
一纸荒年 Trace。
2楼-- · 2019-03-10 03:48

If I understand correctly, you've got some realtime source of data that you want to multiplex to multiple sockets. You've got a single "source" pipe hooked up to whatever's producing your data, and you've got a "destination" pipe for each socket over which you wish to send the data. What you're doing is using tee() to copy data from the source pipe to each of the destination pipes and splice() to copy it from the destination pipes to the sockets themselves.

The fundamental issue you're going to hit here is if one of the sockets simply can't keep up - if you're producing data faster than you can send it, then you're going to have a problem. This isn't related to your use of pipes, it's just a fundamental issue. So, you'll want to pick a strategy to cope in this case - I suggest handling this even if you don't expect it to be common as these things often come up to bite you later. Your basic choices are to either close the offending socket, or to skip data until it's cleared its output buffer - the latter choice might be more suitable for audio/video streaming, for example.

The issue which is related to your use of pipes, however, is that on Linux the size of a pipe's buffer is somewhat inflexible. It defaults to 64K since Linux 2.6.11 (the tee() call was added in 2.6.17) - see the pipe manpage. Since 2.6.35 this value can be changed via the F_SETPIPE_SZ option to fcntl() (see the fcntl manpage) up to the limit specified by /proc/sys/fs/pipe-size-max, but the buffering is still more awkward to change on-demand than a dynamically allocated scheme in user-space would be. This means that your ability to cope with slow sockets will be somewhat limited - whether this is acceptable depends on the rate at which you expect to receive and be able to send data.

Assuming this buffering strategy is acceptable, you're correct in your assumption that you'll need to track how much data each destination pipe has consumed from the source, and it's only safe to discard data which all destination pipes have consumed. This is somewhat complicated by the fact that tee() doesn't have the concept of an offset - you can only copy from the start of the pipe. The consequence of this is that you can only copy at the speed of the slowest socket, since you can't use tee() to copy to a destination pipe until some of the data has been consumed from the source, and you can't do this until all the sockets have the data you're about to consume.

How you handle this depends on the importance of your data. If you really need the speed of tee() and splice(), and you're confident that a slow socket will be an extremely rare event, you could do something like this (I've assumed you're using non-blocking IO and a single thread, but something similar would also work with multiple threads):

  1. Make sure all pipes are non-blocking (use fcntl(d, F_SETFL, O_NONBLOCK) to make each file descriptor non-blocking).
  2. Initialise a read_counter variable for each destination pipe to zero.
  3. Use something like epoll() to wait until there's something in the source pipe.
  4. Loop over all destination pipes where read_counter is zero, calling tee() to transfer data to each one. Make sure you pass SPLICE_F_NONBLOCK in the flags.
  5. Increment read_counter for each destination pipe by the amount transferred by tee(). Keep track of the lowest resultant value.
  6. Find the lowest resultant value of read_counter - if this is non-zero, then discard that amount of data from the source pipe (using a splice() call with a destination opened on /dev/null, for example). After discarding data, subtract the amount discarded from read_counter on all the pipes (since this was the lowest value then this cannot result in any of them becoming negative).
  7. Repeat from step 3.

Note: one thing that's tripped me up in the past is that SPLICE_F_NONBLOCK affects whether the tee() and splice() operations on the pipes are non-blocking, and the O_NONBLOCK you set with fnctl() affects whether the interactions with other calls (e.g. read() and write()) are non-blocking. If you want everything to be non-blocking, set both. Also remember to make your sockets non-blocking or the splice() calls to transfer data to them might block (unless that's what you want, if you're using a threaded approach).

As you can see, this strategy has a major problem - as soon as one socket blocks up, everything halts - the destination pipe for that socket will fill up, and then the source pipe will become stagnant. So, if you reach the stage where tee() returns EAGAIN in step 4 then you'll want to either close that socket, or at least "disconnect" it (i.e. take it out of your loop) such that you don't write anything else to it until its output buffer is empty. Which you choose depends on whether your data stream can recovery from having bits of it skipped.

If you want to cope with network latency more gracefully then you're going to need to do more buffering, and this is going to involve either user-space buffers (which rather negates the advantages of tee() and splice()) or perhaps disk-based buffer. The disk-based buffering will almost certainly be significantly slower than user-space buffering, and hence not appropriate given that presumably you want a lot of speed since you've chosen tee() and splice() in the first place, but I mention it for completeness.

One thing that's worth noting if you end up inserting data from user-space at any point is the vmsplice() call which can perform "gather output" from user-space into a pipe, in a similar way to the writev() call. This might be useful if you're doing enough buffering that you've split your data among multiple different allocated buffers (for example if you're using a pool allocator approach).

Finally, you could imagine swapping sockets between the "fast" scheme of using tee() and splice() and, if they fail to keep up, moving them on to a slower user-space buffering. This is going to complicate your implementation, but if you're handling large numbers of connections and only a very small proportion of them are slow then you're still reducing the amount of copying to user-space that's involved somewhat. However, this would only ever be a short-term measure to cope with transient network issues - as I said originally, you've got a fundamental problem if your sockets are slower than your source. You'd eventually hit some buffering limit and need to skip data or close connections.

Overall, I would carefully consider why you need the speed of tee() and splice() and whether, for your use-case, simply user-space buffering in memory or on disk would be more appropriate. If you're confident that the speeds will always be high, however, and limited buffering is acceptable then the approach I outlined above should work.

Also, one thing I should mention is that this will make your code extremely Linux-specific - I'm not aware of these calls being support in other Unix variants. The sendfile() call is more restricted than splice(), but might be rather more portable. If you really want things to be portable, stick to user-space buffering.

Let me know if there's anything I've covered which you'd like more detail on.

查看更多
登录 后发表回答