How to incrementally parse (and act on) a large fi

2019-02-18 12:27发布

I've created a Qi parser for a custom text file format. There are tens of thousands of entries to process and each entry usually has between 1-10 subentries. I put a trimmed down working example of my parser here.

#include <boost/config/warning_disable.hpp>
#include <boost/spirit/include/qi.hpp>
#include <boost/spirit/include/phoenix_core.hpp>
#include <boost/spirit/include/phoenix_operator.hpp>
#include <boost/spirit/include/phoenix_fusion.hpp>
#include <boost/spirit/include/phoenix_stl.hpp>
#include <boost/spirit/include/phoenix_object.hpp>
#include <boost/spirit/include/support_istream_iterator.hpp>
#include <boost/fusion/include/adapt_struct.hpp>
#include <boost/fusion/include/io.hpp>

#include <fstream>
#include <iostream>
#include <string>

using std::string;
using std::vector;
using std::cout;
using std::endl;

namespace model
    namespace qi = boost::spirit::qi;

    struct spectrum
        string comment;
        string file;
        string nativeId;
        double precursorMz;
        int precursorCharge;
        double precursorIntensity;

    struct cluster
        string id;
        vector<spectrum> spectra;

    struct clustering
        string name;
        vector<cluster> clusters;

// Tell fusion about the data structures to make them first-class fusion citizens.
// Must be at global scope.

    (string, comment)
    (string, file)
    (string, nativeId)
    (double, precursorMz)
    (int, precursorCharge)
    (double, precursorIntensity)

    (string, id)
    (std::vector<model::spectrum>, spectra)

    (string, name)
    (std::vector<model::cluster>, clusters)

namespace {
    struct ReportError
      template<typename, typename, typename, typename> struct result { typedef void type; };

      // contract the string to the surrounding new-line characters
      template<typename Iter>
      void operator()(Iter first_iter, Iter last_iter,
                      Iter error_iter, const boost::spirit::qi::info& what) const
        std::string first(first_iter, error_iter);
        std::string last(error_iter, last_iter);
        auto first_pos = first.rfind('\n');
        auto last_pos = last.find('\n');
        auto error_line = ((first_pos == std::string::npos) ? first
                            : std::string(first, first_pos + 1))
                          + std::string(last, 0, last_pos);
        //auto error_pos = (error_iter - first_iter) + 1;
        /*auto error_pos = error
        if (first_pos != std::string::npos)
          error_pos -= (first_pos + 1);*/

        std::cerr << "Error parsing in " << what << std::endl
                  << error_line << std::endl
                  //<< std::setw(error_pos) << '^'
                  << std::endl;

    const boost::phoenix::function<ReportError> report_error = ReportError();

namespace model
    template <typename Iterator>
    struct cluster_parser : qi::grammar<Iterator, clustering(), qi::blank_type>
        cluster_parser() : cluster_parser::base_type(clusters)
            using qi::int_;
            using qi::lit;
            using qi::double_;
            using qi::bool_;
            using qi::lexeme;
            using qi::eol;
            using qi::ascii::char_;
            using qi::on_error;
            using qi::fail;
            using namespace qi::labels;
            using boost::phoenix::construct;
            using boost::phoenix::val;

            quoted_string %= lexeme['"' > +(char_ - '"') > '"'];

            spectrum_start %=
                lit("SPEC") >
                "#" > +(char_ - "File:") >
                "File:" > quoted_string > lit(",") >
                "NativeID:" > quoted_string >
                bool_ > double_ > int_ > double_;

            cluster_start %= 
                "=Cluster=" > eol >
                "id=" > +(char_ - eol) > eol >
                spectrum_start % eol;

            clusters %= 
                "name=" > +(char_ - eol) > eol >
                eol >
                cluster_start % eol;


            //on_error<fail>(clusters, report_error(_1, _2, _3, _4));
            //on_error<fail>(cluster_start, report_error(_1, _2, _3, _4));
            //on_error<fail>(spectrum_start, report_error(_1, _2, _3, _4));
            //on_error<fail>(quoted_string, report_error(_1, _2, _3, _4));

            // on_success(cluster_start, quantify_cluster(_1, _2, _3, _4)); ??

        qi::rule<Iterator, std::string(), qi::blank_type> quoted_string;
        qi::rule<Iterator, cluster(), qi::blank_type> cluster_start;
        qi::rule<Iterator, spectrum(), qi::blank_type> spectrum_start;
        qi::rule<Iterator, clustering(), qi::blank_type> clusters;

int main()
    using namespace model;

    cluster_parser<boost::spirit::istream_iterator> g; // Our grammar
    string str;
    //std::ifstream input("c:/test/Mo_tai.clustering");
    std::istringstream input("name=GreedyClustering_0.99\n"
"SPEC\t#file=w:\\test\\Mo_Tai_iTRAQ_f4.mgf#id=index=219#title=Mo_Tai_iTRAQ_f4.1254.1254.2 File:\"Mo_Tai_iTRAQ_f4.raw\", NativeID:\"controllerType=0 controllerNumber=1 scan=1254\"\ttrue\t\t300.1374\t2\t\t\t0.0\n"
"SPEC\t#file=w:\\test\\Mo_Tai_iTRAQ_f9.mgf#id=index=560#title=Mo_Tai_iTRAQ_f9.1666.1666.3 File:\"Mo_Tai_iTRAQ_f9.raw\", NativeID:\"controllerType=0 controllerNumber=1 scan=1666\"\ttrue\t\t300.14413\t3\t\t\t0.0\n"
"SPEC\t#file=w:\\test\\Mo_Tai_iTRAQ_f9.mgf#id=index=520#title=Mo_Tai_iTRAQ_f9.1621.1621.3 File:\"Mo_Tai_iTRAQ_f9.raw\", NativeID:\"controllerType=0 controllerNumber=1 scan=1621\"\ttrue\t\t300.14197\t3\t\t\t0.0\n"
"SPEC\t#file=w:\\test\\Mo_Tai_iTRAQ_f2.mgf#id=index=7171#title=Mo_Tai_iTRAQ_f2.12729.12729.2 File:\"Mo_Tai_iTRAQ_f2.raw\", NativeID:\"controllerType=0 controllerNumber=1 scan=12729\"\ttrue\t\t300.15695\t2\t\t\t0.0");
    boost::spirit::istream_iterator begin(input);
    boost::spirit::istream_iterator end;

    clustering clusteringResults;
    bool r = phrase_parse(begin, end, g, qi::blank, clusteringResults);

    if (r && begin == end)
        cout << "Parsing succeeded (" << clusteringResults.clusters.size() << " clusters)\n";

        /*for (size_t i = 0; i < std::min((size_t)10, clusteringResults.clusters.size()); ++i)
            cluster& c = clusteringResults.clusters[i];
            cout << "Cluster " << << " - avg. precursor m/z: " << c.avgPrecursorMz << ", num. spectra: " << c.spectra.size() << endl;
        return 1;
        std::cout << "Parsing failed (" << clusteringResults.clusters.size() << " clusters)\n";

        if (!clusteringResults.clusters.empty())
            cluster& c = clusteringResults.clusters.back();
            cout << "Last cluster parsed " << << ", num. spectra: " << c.spectra.size() << endl;
        return 1;

I don't want to parse the entire file into memory before processing it. How can I make it queue up an entry (cluster) for processing after each cluster is finished parsing, delete the cluster after processing, then continue parsing? Even better would be to have another thread handle the processing asynchronously.

2楼-- · 2019-02-18 12:45

Just use streaming iterators.

Or operate on a memory mapped file.

On the processing side, push actions onto a queue from inside a semantic action.

Note: you could run into a supposed bug that doesn't clear the backtrack buffers properly; You might want to check this and take preventative measures as described in this answer: Boost spirit memory leak using flush_multi_pass

Live Demo

#include <boost/fusion/include/adapt_struct.hpp>
#include <boost/spirit/include/qi.hpp>
#include <boost/spirit/include/phoenix.hpp>
#include <boost/fusion/include/io.hpp>

namespace model
    namespace qi = boost::spirit::qi;
    namespace px = boost::phoenix;

    struct spectrum {
        std::string comment;
        std::string file;
        std::string nativeId;
        double      precursorMz;
        int         precursorCharge;
        double      precursorIntensity;

    struct cluster {
        std::string           id;
        std::vector<spectrum> spectra;

BOOST_FUSION_ADAPT_STRUCT(model::spectrum, comment, file, nativeId, precursorMz, precursorCharge, precursorIntensity)
BOOST_FUSION_ADAPT_STRUCT(model::cluster, id, spectra)

namespace model
    template <typename Iterator>
    struct cluster_parser : qi::grammar<Iterator>
        cluster_parser(std::function<void(std::string const&, model::cluster const&)> handler) 
            :   cluster_parser::base_type(start),
            using namespace qi;

            quoted_string %= lexeme['"' > +(char_ - '"') > '"'];

            spectrum_start %=
                lit("SPEC") >
                "#" > +(char_ - "File:") >
                "File:" > quoted_string > lit(",") >
                "NativeID:" > quoted_string >
                bool_ > double_ > int_ > double_;

            cluster_start %= 
                "=Cluster=" > eol >
                "id=" > +(char_ - eol) > eol >
                spectrum_start % eol;

            clusters %= 
                "name=" > qi::as_string[ +(char_ - eol) ][ name_ = _1 ] > eol > eol >
                cluster_start [ submit_(name_, _1) ] % eol;

            start = skip(blank) [clusters];

        qi::_a_type name_;
        px::function<std::function<void(std::string const&, model::cluster const&)> > submit_;

        qi::rule<Iterator, std::string(), qi::blank_type> quoted_string;
        qi::rule<Iterator, cluster(), qi::blank_type> cluster_start;
        qi::rule<Iterator, spectrum(), qi::blank_type> spectrum_start;
        qi::rule<Iterator, qi::locals<std::string>, qi::blank_type> clusters;
        qi::rule<Iterator> start;

int main()
    using namespace model;

    cluster_parser<boost::spirit::istream_iterator> g([&](auto const&...){std::cout << "handled\n";}); // Our grammar
    std::string str;
    //std::ifstream input("c:/test/Mo_tai.clustering");

    std::istringstream input(R"(name=GreedyClustering_0.99

SPEC    #file=w:\test\Mo_Tai_iTRAQ_f4.mgf#id=index=219#title=Mo_Tai_iTRAQ_f4.1254.1254.2 File:"Mo_Tai_iTRAQ_f4.raw", NativeID:"controllerType=0 controllerNumber=1 scan=1254"   true        300.1374    2           0.0
SPEC    #file=w:\test\Mo_Tai_iTRAQ_f9.mgf#id=index=560#title=Mo_Tai_iTRAQ_f9.1666.1666.3 File:"Mo_Tai_iTRAQ_f9.raw", NativeID:"controllerType=0 controllerNumber=1 scan=1666"   true        300.14413   3           0.0
SPEC    #file=w:\test\Mo_Tai_iTRAQ_f9.mgf#id=index=520#title=Mo_Tai_iTRAQ_f9.1621.1621.3 File:"Mo_Tai_iTRAQ_f9.raw", NativeID:"controllerType=0 controllerNumber=1 scan=1621"   true        300.14197   3           0.0
SPEC    #file=w:\test\Mo_Tai_iTRAQ_f2.mgf#id=index=7171#title=Mo_Tai_iTRAQ_f2.12729.12729.2 File:"Mo_Tai_iTRAQ_f2.raw", NativeID:"controllerType=0 controllerNumber=1 scan=12729"   true        300.15695   2           0.0)");
    boost::spirit::istream_iterator begin(input);
    boost::spirit::istream_iterator end;

    bool r = phrase_parse(begin, end, g, qi::blank);

    if (r && begin == end) {
        std::cout << "Parsing succeeded\n";
    else {
        std::cout << "Parsing failed\n";

    if (begin!=end) {
        std::cout << "Unparsed remaining input: '" << std::string(begin, end) << "\n";

    return (r && begin==end)? 0 : 1;


Parsing succeeded

BONUS: Threaded workers

Here's a version that dispatches the clusters for asynchronous processing on a thread pool.

Note that the submit method posts a lambda to the service. The lambda captures by value because the lifetime of the parameters should extend during the processing.

Live On Coliru

#include <boost/asio.hpp>
#include <boost/thread.hpp>
namespace ba = boost::asio;

struct Processing {
    Processing() {
        for (unsigned i=0; i < boost::thread::hardware_concurrency(); ++i)
            _threads.create_thread([this] {; });

    ~Processing() {

    void submit(std::string const& name, model::cluster const& cluster) {[=] { do_processing(name, cluster); });

    void do_processing(std::string const& name, model::cluster const& cluster) {
        std::cout << "Thread " << boost::this_thread::get_id() << ": " << name << " cluster of " << cluster.spectra.size() << " spectra\n";

    ba::io_service _svc;
    boost::optional<ba::io_service::work> _work = ba::io_service::work(_svc);
    boost::thread_group _threads;

[...snip...] and in main:

Processing processing;
auto handler = [&processing](auto&... args) { processing.submit(args...); };

cluster_parser<boost::spirit::istream_iterator> g(handler); // Our grammar

The rest is unmodified, and now it prints (e.g.):

Thread 7f0144a5b700: GreedyClustering_0.99 cluster of 1 spectra
Thread 7f014425a700: GreedyClustering_0.99 cluster of 2 spectra
Parsing succeeded
Thread 7f0143a59700: GreedyClustering_0.99 cluster of 1 spectra
登录 后发表回答