Using the same istream with a resizeable streambuf

2019-03-06 08:27发布

I am reading data off a socket using boost asio async_read(). I have the following members of the reader class which persist through the lifetime of the program:

boost::asio::streambuf recv_data;
std::istream stream_is(&recv_data);

The async_read call looks like this:

boost::asio::async_read(ep_ptr->get_sock(), recv_data, boost::asio::transfer_exactly(n),
                              boost::bind(&IProtocol::handle_data_recv, &protocol,
                                          boost::asio::placeholders::error));

My question is, what would happen if I am reading 'n' bytes off the socket and the size of the streambuf is less than 'n' so it resizes itself. Do I need to re-create the std::istream since the internal streambuf buffer that the std::istream is holding might now be freed/deallocated?

1条回答
Lonely孤独者°
2楼-- · 2019-03-06 08:42

No, thankfully, the binding doesn't care that recv_data's internals may get reallocated and it instead binds to the recv_data object itself. Here is a working example of a downloader I wrote which you can see I do not re-allocate the buffer in between reads.

In the same way you can safely share a reference to a vector and not care if the internals of the vector are re-allocated (unless you start pointing at memory addresses of the vector's elements directly, or using iterators after they become invalidated. The handle to the vector remains valid, and in this same way, the handle to the streambuf remains valid to the istream and it works just fine).

download.h

#ifndef _MV_DOWNLOAD_H_
#define _MV_DOWNLOAD_H_

#include <string>
#include <iostream>
#include <istream>
#include <ostream>
#include <fstream>
#include <algorithm>
#include "Network/url.h"
#include "Utility/generalUtility.h"
#include <boost/asio.hpp>
#include <boost/bind.hpp>

namespace MV {
    struct HttpHeader {
        std::string version;
        int status = 0;
        std::string message;
        std::map<std::string, std::string> values;

        std::vector<std::string> bounces;

        bool success = false;
        std::string errorMessage;

        size_t contentLength;

        HttpHeader() {
        }

        HttpHeader(std::istream& response_stream) {
            read(response_stream);
        }

        void read(std::istream& response_stream);
    };


    inline std::ostream& operator<<(std::ostream& os, const HttpHeader& obj) {
        os << "\\/______HTTP_HEADER______\\/\nVersion [" << obj.version << "] Status [" << obj.status << "] Message [" << obj.message << "]\n";
        os << "||-----------------------||\n";
        for (auto&& kvp : obj.values) {
            os << "[" << kvp.first << "]: " << kvp.second << "\n";
        }
        os << "\n||--------Bounces--------||\n";
        for (size_t i = 0; i < obj.bounces.size(); ++i) {
            os << i << ": " << obj.bounces[i] << "\n";
        }
        os << "/\\_______________________/\\" << std::endl;
        return os;
    }
    inline std::istream& operator>>(std::istream& a_is, HttpHeader& a_obj) {
        a_obj.read(a_is);
        return a_is;
    }

    class DownloadRequest : public std::enable_shared_from_this<DownloadRequest> {
    public:
        static std::shared_ptr<DownloadRequest> make(const MV::Url& a_url, const std::shared_ptr<std::ostream> &a_streamOutput) {
            auto result = std::shared_ptr<DownloadRequest>(new DownloadRequest(a_streamOutput));
            result->perform(a_url);
            return result;
        }

        //onComplete is called on success or error at the end of the download.
        static std::shared_ptr<DownloadRequest> make(const std::shared_ptr<boost::asio::io_service> &a_ioService, const MV::Url& a_url, const std::shared_ptr<std::ostream> &a_streamOutput, std::function<void (std::shared_ptr<DownloadRequest>)> a_onComplete) {
            auto result = std::shared_ptr<DownloadRequest>(new DownloadRequest(a_streamOutput));
            result->onComplete = a_onComplete;
            result->ioService = a_ioService;
            result->perform(a_url);
            return result;
        }

        HttpHeader& header() {
            return headerData;
        }

        MV::Url finalUrl() {
            return currentUrl;
        }

        MV::Url inputUrl() {
            return originalUrl;
        }

    private:
        DownloadRequest(const std::shared_ptr<std::ostream> &a_streamOutput) :
            streamOutput(a_streamOutput) {
        }

        void perform(const MV::Url& a_url);

        bool initializeSocket();

        void initiateRequest(const MV::Url& a_url);

        void handleResolve(const boost::system::error_code& err, boost::asio::ip::tcp::resolver::iterator endpoint_iterator);
        void handleConnect(const boost::system::error_code& err, boost::asio::ip::tcp::resolver::iterator endpoint_iterator);
        void handleWriteRequest(const boost::system::error_code& err);
        void handleReadHeaders(const boost::system::error_code& err);
        void handleReadContent(const boost::system::error_code& err);

        void readResponseToStream() {
            (*streamOutput) << &(*response);
        }

        std::shared_ptr<boost::asio::io_service> ioService;
        std::unique_ptr<boost::asio::ip::tcp::resolver> resolver;
        std::unique_ptr<boost::asio::ip::tcp::socket> socket;

        std::unique_ptr<std::istream> responseStream;

        std::unique_ptr<boost::asio::streambuf> request;
        std::unique_ptr<boost::asio::streambuf> response;

        std::shared_ptr<std::ostream> streamOutput;

        HttpHeader headerData;

        MV::Url currentUrl;
        MV::Url originalUrl;

        std::function<void(std::shared_ptr<DownloadRequest>)> onComplete;
    };

    std::string DownloadString(const MV::Url& a_url);

    HttpHeader DownloadFile(const MV::Url& a_url, const std::string &a_path);
    void DownloadFile(const std::shared_ptr<boost::asio::io_service> &a_ioService, const MV::Url& a_url, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete = std::function<void(std::shared_ptr<DownloadRequest>)>());

    void DownloadFiles(const std::vector<MV::Url>& a_url, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete = std::function<void(std::shared_ptr<DownloadRequest>)>());
    void DownloadFiles(const std::shared_ptr<boost::asio::io_service> &a_ioService, const std::vector<MV::Url>& a_url, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete = std::function<void(std::shared_ptr<DownloadRequest>)>(), std::function<void()> a_onAllComplete = std::function<void()>());
}
#endif

download.cpp

#include "download.h"
#include <boost/filesystem.hpp>
#include <atomic>

namespace MV{
    void HttpHeader::read(std::istream& response_stream) {
        values.clear();

        response_stream >> version;
        std::string status_code;
        response_stream >> status_code;
        try {
            status = std::stoi(status_code);
        } catch (...) {
            status = 0;
        }

        getline_platform_agnostic(response_stream, message);

        if (!message.empty() && message[0] == ' ') { message = message.substr(1); }

        std::string header;
        while (getline_platform_agnostic(response_stream, header) && !header.empty()) {
            auto index = header.find_first_of(':');
            if (index != std::string::npos && index > 0) {
                auto key = header.substr(0, index);
                auto value = (index + 2 >= header.size()) ? "" : header.substr(index + 2);
                std::transform(key.begin(), key.end(), key.begin(), [](char c) {return std::tolower(c); });
                values[key] = value;
                if (toLower(key) == "content-length") {
                    try {
                        contentLength = static_cast<size_t>(stol(value));
                    } catch (std::exception &e) {
                        std::cerr << e.what() << std::endl;
                        contentLength = 0;
                    }
                }
            }
        }
    }

    std::string DownloadString(const Url& a_url) {
        auto result = std::make_shared<std::stringstream>();
        if (DownloadRequest::make(a_url, result)->header().success) {
            return result->str();
        } else {
            return "";
        }
    }

    MV::HttpHeader DownloadFile(const Url& a_url, const std::string &a_path) {
        HttpHeader header;
        {
            boost::filesystem::create_directories(boost::filesystem::path(a_path).parent_path());
            auto outFile = std::make_shared<std::ofstream>(a_path, std::ofstream::out | std::ofstream::binary);
            auto request = DownloadRequest::make(a_url, outFile);
            header = request->header();
        }
        if (!header.success) {
            std::remove(a_path.c_str());
        }
        return header;
    }

    void DownloadFile(const std::shared_ptr<boost::asio::io_service> &a_ioService, const MV::Url& a_url, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete) {
        boost::filesystem::create_directories(boost::filesystem::path(a_path).parent_path());
        auto outFile = std::make_shared<std::ofstream>(a_path, std::ofstream::out | std::ofstream::binary);
        auto request = DownloadRequest::make(a_ioService, a_url, outFile, [a_path, a_onComplete](std::shared_ptr<DownloadRequest> a_result) {
            if (!a_result->header().success) {
                std::remove(a_path.c_str());
            }
            if (a_onComplete) { a_onComplete(a_result); }
        });
    }

    void DownloadFiles(const std::vector<MV::Url>& a_urls, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete) {
        auto service = std::make_shared<boost::asio::io_service>();
        for (auto&& url : a_urls) {
            DownloadFile(service, url, a_path + boost::filesystem::path(url.path()).filename().string(), a_onComplete);
        }
        service->run();
    }
    void DownloadFiles(const std::shared_ptr<boost::asio::io_service> &a_ioService, const std::vector<MV::Url>& a_urls, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete, std::function<void()> a_onAllComplete) {
        size_t totalFiles = a_urls.size();
        for (auto&& url : a_urls) {
            auto counter = std::make_shared<std::atomic<size_t>>(0);
            DownloadFile(a_ioService, url, a_path + boost::filesystem::path(url.path()).filename().string(), [=](std::shared_ptr<DownloadRequest> a_request) {
                a_onComplete(a_request);
                if (++(*counter) == totalFiles) {
                    a_onAllComplete();
                }
            });
        }
    }

    void DownloadRequest::handleReadContent(const boost::system::error_code& err) {
        if (!err) {
            readResponseToStream();
            if (onComplete) { onComplete(shared_from_this()); }
        } else if (err != boost::asio::error::eof) {
            headerData.success = false;
            headerData.errorMessage = "Download Read Content Failure: " + err.message();
            std::cerr << headerData.errorMessage << std::endl;
            if (onComplete) { onComplete(shared_from_this()); }
        }
    }

    void DownloadRequest::handleReadHeaders(const boost::system::error_code& err) {
        if (!err) {
            responseStream = std::make_unique<std::istream>(&(*response));

            headerData.read(*responseStream);
            headerData.success = true;
            headerData.errorMessage = "";
            if (headerData.status >= 300 && headerData.status < 400 && headerData.bounces.size() < 32 && headerData.values.find("location") != headerData.values.end()) {
                headerData.bounces.push_back(currentUrl.toString());
                initiateRequest(headerData.values["location"]);
            } else {
                auto amountLeftToRead = headerData.contentLength - response->size();
                if (response->size() > 0) {
                    readResponseToStream();
                }
                if (amountLeftToRead > 0) {
                    boost::asio::async_read(*socket, *response, boost::asio::transfer_at_least(amountLeftToRead), boost::bind(&DownloadRequest::handleReadContent, shared_from_this(), boost::asio::placeholders::error));
                } else {
                    if (onComplete) { onComplete(shared_from_this()); }
                }
            }
        } else {
            headerData.success = false;
            headerData.errorMessage = "Download Read Header Failure: " + err.message();
            std::cerr << headerData.errorMessage << std::endl;
            if (onComplete) { onComplete(shared_from_this()); }
        }
    }

    void DownloadRequest::handleWriteRequest(const boost::system::error_code& err) {
        if (!err) {
            boost::asio::async_read_until(*socket, *response, "\r\n\r\n", boost::bind(&DownloadRequest::handleReadHeaders, shared_from_this(), boost::asio::placeholders::error));
        } else {
            headerData.success = false;
            headerData.errorMessage = "Download Write Failure: " + err.message();
            std::cerr << headerData.errorMessage << std::endl;
            if (onComplete) { onComplete(shared_from_this()); }
        }
    }

    void DownloadRequest::handleConnect(const boost::system::error_code& err, boost::asio::ip::tcp::resolver::iterator endpoint_iterator) {
        if (!err) {
            // The connection was successful. Send the request.
            boost::asio::async_write(*socket, *request, boost::bind(&DownloadRequest::handleWriteRequest, shared_from_this(), boost::asio::placeholders::error));
        } else if (endpoint_iterator != boost::asio::ip::tcp::resolver::iterator()) {
            // The connection failed. Try the next endpoint in the list.
            socket->close();
            boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator;
            socket->async_connect(endpoint, boost::bind(&DownloadRequest::handleConnect, shared_from_this(), boost::asio::placeholders::error, ++endpoint_iterator));
        } else {
            headerData.success = false;
            headerData.errorMessage = "Download Connection Failure: " + err.message();
            std::cerr << headerData.errorMessage << std::endl;
            if (onComplete) { onComplete(shared_from_this()); }
        }
    }

    void DownloadRequest::handleResolve(const boost::system::error_code& err, boost::asio::ip::tcp::resolver::iterator endpoint_iterator) {
        if (!err) {
            // Attempt a connection to the first endpoint in the list. Each endpoint
            // will be tried until we successfully establish a connection.
            boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator;
            socket->async_connect(endpoint, boost::bind(&DownloadRequest::handleConnect, shared_from_this(), boost::asio::placeholders::error, ++endpoint_iterator));
        } else {
            headerData.success = false;
            headerData.errorMessage = "Download Resolve Failure: " + err.message();
            std::cerr << headerData.errorMessage << std::endl;
            if (onComplete) { onComplete(shared_from_this()); }
        }
    }

    void DownloadRequest::initiateRequest(const MV::Url& a_url) {
        socket->close();
        currentUrl = a_url;
        request = std::make_unique<boost::asio::streambuf>();
        response = std::make_unique<boost::asio::streambuf>();
        using boost::asio::ip::tcp;

        std::ostream requestStream(&(*request));
        requestStream << "GET " << a_url.pathAndQuery() << " HTTP/1.1\r\n";
        requestStream << "Host: " << a_url.host() << "\r\n";
        requestStream << "Accept: */*\r\n";
        requestStream << "Connection: close\r\n\r\n";

        tcp::resolver::query query(a_url.host(), "http");
        resolver->async_resolve(query, boost::bind(&DownloadRequest::handleResolve, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::iterator));
    }

    bool DownloadRequest::initializeSocket() {
        bool created = false;
        if (!ioService) {
            ioService = std::make_shared<boost::asio::io_service>();
            created = true;
        }

        resolver = std::make_unique<boost::asio::ip::tcp::resolver>(*ioService);
        socket = std::make_unique<boost::asio::ip::tcp::socket>(*ioService);

        return created;
    }

    void DownloadRequest::perform(const MV::Url& a_url) {
        originalUrl = a_url;
        try {
            bool needToCallRun = initializeSocket();
            initiateRequest(a_url);
            if (needToCallRun) {
                ioService->run();
            }
        } catch (...) {
            headerData.success = false;
            headerData.errorMessage = "Exception thrown to top level.";
            std::cerr << headerData.errorMessage << std::endl;
            onComplete(shared_from_this());
        }
    }

}

generalUtility.h (part of it anyway, just for this reference)

inline std::istream& getline_platform_agnostic(std::istream& is, std::string& t) {
    t.clear();

    // The characters in the stream are read one-by-one using a std::streambuf.
    // That is faster than reading them one-by-one using the std::istream.
    // Code that uses streambuf this way must be guarded by a sentry object.
    // The sentry object performs various tasks,
    // such as thread synchronization and updating the stream state.

    std::istream::sentry se(is, true);
    std::streambuf* sb = is.rdbuf();

    for (;;) {
        int c = sb->sbumpc();
        switch (c) {
        case '\n':
            return is;
        case '\r':
            if (sb->sgetc() == '\n')
                sb->sbumpc();
            return is;
        case EOF:
            // Also handle the case when the last line has no line ending
            if (t.empty())
                is.setstate(std::ios::eofbit);
            return is;
        default:
            t += (char)c;
        }
    }
}

inline std::string toLower(std::string s) {
    std::transform(s.begin(), s.end(), s.begin(), [](char c) { return std::tolower(c); });
    return s;
}

url.h

Modified (slightly, just changed some naming scheme stuff)

https://github.com/keyz182/Poco-1.4.3/blob/master/Foundation/include/Poco/URI.h

https://github.com/keyz182/Poco-1.4.3/blob/master/Foundation/src/URI.cpp

查看更多
登录 后发表回答