How to incrementally parse (and act on) a large fi

2019-02-18 12:27发布

I've created a Qi parser for a custom text file format. There are tens of thousands of entries to process and each entry usually has between 1-10 subentries. I put a trimmed down working example of my parser here.

#include <boost/config/warning_disable.hpp>
#include <boost/spirit/include/qi.hpp>
#include <boost/spirit/include/phoenix_core.hpp>
#include <boost/spirit/include/phoenix_operator.hpp>
#include <boost/spirit/include/phoenix_fusion.hpp>
#include <boost/spirit/include/phoenix_stl.hpp>
#include <boost/spirit/include/phoenix_object.hpp>
#include <boost/spirit/include/support_istream_iterator.hpp>
#include <boost/fusion/include/adapt_struct.hpp>
#include <boost/fusion/include/io.hpp>

#include <fstream>
#include <iostream>
#include <string>

using std::string;
using std::vector;
using std::cout;
using std::endl;

namespace model
{
    namespace qi = boost::spirit::qi;

    struct spectrum
    {
        string comment;
        string file;
        string nativeId;
        double precursorMz;
        int precursorCharge;
        double precursorIntensity;
    };

    struct cluster
    {
        string id;
        vector<spectrum> spectra;
    };

    struct clustering
    {
        string name;
        vector<cluster> clusters;
    };
}

// Tell fusion about the data structures to make them first-class fusion citizens.
// Must be at global scope.

BOOST_FUSION_ADAPT_STRUCT(
    model::spectrum,
    (string, comment)
    (string, file)
    (string, nativeId)
    (double, precursorMz)
    (int, precursorCharge)
    (double, precursorIntensity)
)

BOOST_FUSION_ADAPT_STRUCT(
    model::cluster,
    (string, id)
    (std::vector<model::spectrum>, spectra)
)

BOOST_FUSION_ADAPT_STRUCT(
    model::clustering,
    (string, name)
    (std::vector<model::cluster>, clusters)
)

namespace {
    struct ReportError
    {
      template<typename, typename, typename, typename> struct result { typedef void type; };

      // contract the string to the surrounding new-line characters
      template<typename Iter>
      void operator()(Iter first_iter, Iter last_iter,
                      Iter error_iter, const boost::spirit::qi::info& what) const
      {
        std::string first(first_iter, error_iter);
        std::string last(error_iter, last_iter);
        auto first_pos = first.rfind('\n');
        auto last_pos = last.find('\n');
        auto error_line = ((first_pos == std::string::npos) ? first
                            : std::string(first, first_pos + 1))
                          + std::string(last, 0, last_pos);
        //auto error_pos = (error_iter - first_iter) + 1;
        /*auto error_pos = error
        if (first_pos != std::string::npos)
          error_pos -= (first_pos + 1);*/

        std::cerr << "Error parsing in " << what << std::endl
                  << error_line << std::endl
                  //<< std::setw(error_pos) << '^'
                  << std::endl;
        }
    };

    const boost::phoenix::function<ReportError> report_error = ReportError();
}

namespace model
{
    template <typename Iterator>
    struct cluster_parser : qi::grammar<Iterator, clustering(), qi::blank_type>
    {
        cluster_parser() : cluster_parser::base_type(clusters)
        {
            using qi::int_;
            using qi::lit;
            using qi::double_;
            using qi::bool_;
            using qi::lexeme;
            using qi::eol;
            using qi::ascii::char_;
            using qi::on_error;
            using qi::fail;
            using namespace qi::labels;
            using boost::phoenix::construct;
            using boost::phoenix::val;

            quoted_string %= lexeme['"' > +(char_ - '"') > '"'];

            spectrum_start %=
                lit("SPEC") >
                "#" > +(char_ - "File:") >
                "File:" > quoted_string > lit(",") >
                "NativeID:" > quoted_string >
                bool_ > double_ > int_ > double_;

            cluster_start %= 
                "=Cluster=" > eol >
                "id=" > +(char_ - eol) > eol >
                spectrum_start % eol;

            clusters %= 
                "name=" > +(char_ - eol) > eol >
                eol >
                cluster_start % eol;

            BOOST_SPIRIT_DEBUG_NODES((clusters)(cluster_start)(quoted_string)(spectrum_start))

            //on_error<fail>(clusters, report_error(_1, _2, _3, _4));
            //on_error<fail>(cluster_start, report_error(_1, _2, _3, _4));
            //on_error<fail>(spectrum_start, report_error(_1, _2, _3, _4));
            //on_error<fail>(quoted_string, report_error(_1, _2, _3, _4));

            // on_success(cluster_start, quantify_cluster(_1, _2, _3, _4)); ??
        }

        qi::rule<Iterator, std::string(), qi::blank_type> quoted_string;
        qi::rule<Iterator, cluster(), qi::blank_type> cluster_start;
        qi::rule<Iterator, spectrum(), qi::blank_type> spectrum_start;
        qi::rule<Iterator, clustering(), qi::blank_type> clusters;
    };
}

int main()
{
    using namespace model;

    cluster_parser<boost::spirit::istream_iterator> g; // Our grammar
    string str;
    //std::ifstream input("c:/test/Mo_tai.clustering");
    std::istringstream input("name=GreedyClustering_0.99\n"
"\n"
"=Cluster=\n"
"id=9c8c5830-5841-4f77-b819-64180509615b\n"
"SPEC\t#file=w:\\test\\Mo_Tai_iTRAQ_f4.mgf#id=index=219#title=Mo_Tai_iTRAQ_f4.1254.1254.2 File:\"Mo_Tai_iTRAQ_f4.raw\", NativeID:\"controllerType=0 controllerNumber=1 scan=1254\"\ttrue\t\t300.1374\t2\t\t\t0.0\n"
"=Cluster=\n"
"id=f8f384a1-3d5f-4af1-9581-4d03a5aa3342\n"
"SPEC\t#file=w:\\test\\Mo_Tai_iTRAQ_f9.mgf#id=index=560#title=Mo_Tai_iTRAQ_f9.1666.1666.3 File:\"Mo_Tai_iTRAQ_f9.raw\", NativeID:\"controllerType=0 controllerNumber=1 scan=1666\"\ttrue\t\t300.14413\t3\t\t\t0.0\n"
"SPEC\t#file=w:\\test\\Mo_Tai_iTRAQ_f9.mgf#id=index=520#title=Mo_Tai_iTRAQ_f9.1621.1621.3 File:\"Mo_Tai_iTRAQ_f9.raw\", NativeID:\"controllerType=0 controllerNumber=1 scan=1621\"\ttrue\t\t300.14197\t3\t\t\t0.0\n"
"=Cluster=\n"
"id=b84b79e1-44bc-44c0-a9af-5391ca02582d\n"
"SPEC\t#file=w:\\test\\Mo_Tai_iTRAQ_f2.mgf#id=index=7171#title=Mo_Tai_iTRAQ_f2.12729.12729.2 File:\"Mo_Tai_iTRAQ_f2.raw\", NativeID:\"controllerType=0 controllerNumber=1 scan=12729\"\ttrue\t\t300.15695\t2\t\t\t0.0");
    input.unsetf(std::ios::skipws);
    boost::spirit::istream_iterator begin(input);
    boost::spirit::istream_iterator end;

    clustering clusteringResults;
    bool r = phrase_parse(begin, end, g, qi::blank, clusteringResults);

    if (r && begin == end)
    {
        cout << "Parsing succeeded (" << clusteringResults.clusters.size() << " clusters)\n";

        /*for (size_t i = 0; i < std::min((size_t)10, clusteringResults.clusters.size()); ++i)
        {
            cluster& c = clusteringResults.clusters[i];
            cout << "Cluster " << c.id << " - avg. precursor m/z: " << c.avgPrecursorMz << ", num. spectra: " << c.spectra.size() << endl;
        }*/
        return 1;
    }
    else
    {
        std::cout << "Parsing failed (" << clusteringResults.clusters.size() << " clusters)\n";

        if (!clusteringResults.clusters.empty())
        {
            cluster& c = clusteringResults.clusters.back();
            cout << "Last cluster parsed " << c.id << ", num. spectra: " << c.spectra.size() << endl;
        }
        return 1;
    }
}

I don't want to parse the entire file into memory before processing it. How can I make it queue up an entry (cluster) for processing after each cluster is finished parsing, delete the cluster after processing, then continue parsing? Even better would be to have another thread handle the processing asynchronously.

1条回答
放我归山
2楼-- · 2019-02-18 12:45

Just use streaming iterators.

Or operate on a memory mapped file.

On the processing side, push actions onto a queue from inside a semantic action.

Note: you could run into a supposed bug that doesn't clear the backtrack buffers properly; You might want to check this and take preventative measures as described in this answer: Boost spirit memory leak using flush_multi_pass

Live Demo

#include <boost/fusion/include/adapt_struct.hpp>
#include <boost/spirit/include/qi.hpp>
#include <boost/spirit/include/phoenix.hpp>
#include <boost/fusion/include/io.hpp>

namespace model
{
    namespace qi = boost::spirit::qi;
    namespace px = boost::phoenix;

    struct spectrum {
        std::string comment;
        std::string file;
        std::string nativeId;
        double      precursorMz;
        int         precursorCharge;
        double      precursorIntensity;
    };

    struct cluster {
        std::string           id;
        std::vector<spectrum> spectra;
    };
}

BOOST_FUSION_ADAPT_STRUCT(model::spectrum, comment, file, nativeId, precursorMz, precursorCharge, precursorIntensity)
BOOST_FUSION_ADAPT_STRUCT(model::cluster, id, spectra)

namespace model
{
    template <typename Iterator>
    struct cluster_parser : qi::grammar<Iterator>
    {
        cluster_parser(std::function<void(std::string const&, model::cluster const&)> handler) 
            :   cluster_parser::base_type(start),
                submit_(handler)
        {
            using namespace qi;

            quoted_string %= lexeme['"' > +(char_ - '"') > '"'];

            spectrum_start %=
                lit("SPEC") >
                "#" > +(char_ - "File:") >
                "File:" > quoted_string > lit(",") >
                "NativeID:" > quoted_string >
                bool_ > double_ > int_ > double_;

            cluster_start %= 
                "=Cluster=" > eol >
                "id=" > +(char_ - eol) > eol >
                spectrum_start % eol;


            clusters %= 
                "name=" > qi::as_string[ +(char_ - eol) ][ name_ = _1 ] > eol > eol >
                cluster_start [ submit_(name_, _1) ] % eol;

            start = skip(blank) [clusters];

            BOOST_SPIRIT_DEBUG_NODES((start)(clusters)(cluster_start)(quoted_string)(spectrum_start))
        }
      private:
        qi::_a_type name_;
        px::function<std::function<void(std::string const&, model::cluster const&)> > submit_;

        qi::rule<Iterator, std::string(), qi::blank_type> quoted_string;
        qi::rule<Iterator, cluster(), qi::blank_type> cluster_start;
        qi::rule<Iterator, spectrum(), qi::blank_type> spectrum_start;
        qi::rule<Iterator, qi::locals<std::string>, qi::blank_type> clusters;
        qi::rule<Iterator> start;
    };
}

int main()
{
    using namespace model;

    cluster_parser<boost::spirit::istream_iterator> g([&](auto const&...){std::cout << "handled\n";}); // Our grammar
    std::string str;
    //std::ifstream input("c:/test/Mo_tai.clustering");

    std::istringstream input(R"(name=GreedyClustering_0.99

=Cluster=
id=9c8c5830-5841-4f77-b819-64180509615b
SPEC    #file=w:\test\Mo_Tai_iTRAQ_f4.mgf#id=index=219#title=Mo_Tai_iTRAQ_f4.1254.1254.2 File:"Mo_Tai_iTRAQ_f4.raw", NativeID:"controllerType=0 controllerNumber=1 scan=1254"   true        300.1374    2           0.0
=Cluster=
id=f8f384a1-3d5f-4af1-9581-4d03a5aa3342
SPEC    #file=w:\test\Mo_Tai_iTRAQ_f9.mgf#id=index=560#title=Mo_Tai_iTRAQ_f9.1666.1666.3 File:"Mo_Tai_iTRAQ_f9.raw", NativeID:"controllerType=0 controllerNumber=1 scan=1666"   true        300.14413   3           0.0
SPEC    #file=w:\test\Mo_Tai_iTRAQ_f9.mgf#id=index=520#title=Mo_Tai_iTRAQ_f9.1621.1621.3 File:"Mo_Tai_iTRAQ_f9.raw", NativeID:"controllerType=0 controllerNumber=1 scan=1621"   true        300.14197   3           0.0
=Cluster=
id=b84b79e1-44bc-44c0-a9af-5391ca02582d
SPEC    #file=w:\test\Mo_Tai_iTRAQ_f2.mgf#id=index=7171#title=Mo_Tai_iTRAQ_f2.12729.12729.2 File:"Mo_Tai_iTRAQ_f2.raw", NativeID:"controllerType=0 controllerNumber=1 scan=12729"   true        300.15695   2           0.0)");
    input.unsetf(std::ios::skipws);
    boost::spirit::istream_iterator begin(input);
    boost::spirit::istream_iterator end;

    bool r = phrase_parse(begin, end, g, qi::blank);

    if (r && begin == end) {
        std::cout << "Parsing succeeded\n";
    }
    else {
        std::cout << "Parsing failed\n";
    }

    if (begin!=end) {
        std::cout << "Unparsed remaining input: '" << std::string(begin, end) << "\n";
    }

    return (r && begin==end)? 0 : 1;
}

Prints

handled
handled
handled
Parsing succeeded

BONUS: Threaded workers

Here's a version that dispatches the clusters for asynchronous processing on a thread pool.

Note that the submit method posts a lambda to the service. The lambda captures by value because the lifetime of the parameters should extend during the processing.

Live On Coliru

#include <boost/asio.hpp>
#include <boost/thread.hpp>
namespace ba = boost::asio;

struct Processing {
    Processing() {
        for (unsigned i=0; i < boost::thread::hardware_concurrency(); ++i)
            _threads.create_thread([this] { _svc.run(); });
    }

    ~Processing() {
        _work.reset();
        _threads.join_all();
    }

    void submit(std::string const& name, model::cluster const& cluster) {
        _svc.post([=] { do_processing(name, cluster); });
    }

  private:
    void do_processing(std::string const& name, model::cluster const& cluster) {
        std::cout << "Thread " << boost::this_thread::get_id() << ": " << name << " cluster of " << cluster.spectra.size() << " spectra\n";
        boost::this_thread::sleep_for(boost::chrono::milliseconds(950));
    }

    ba::io_service _svc;
    boost::optional<ba::io_service::work> _work = ba::io_service::work(_svc);
    boost::thread_group _threads;
};

[...snip...] and in main:

Processing processing;
auto handler = [&processing](auto&... args) { processing.submit(args...); };

cluster_parser<boost::spirit::istream_iterator> g(handler); // Our grammar

The rest is unmodified, and now it prints (e.g.):

Thread 7f0144a5b700: GreedyClustering_0.99 cluster of 1 spectra
Thread 7f014425a700: GreedyClustering_0.99 cluster of 2 spectra
Parsing succeeded
Thread 7f0143a59700: GreedyClustering_0.99 cluster of 1 spectra
查看更多
登录 后发表回答