可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):
问题:
I try to replace std::multiset with std::priority_queue. But I was dissapointed with the speed results. Running time of the algorithm increase by 50%...
Here are the corresponding commands:
top() = begin();
pop() = erase(knn.begin());
push() = insert();
I am surprised with the speed of priority_queue implementation, I expected different results (better for PQ)... Conceptually, the multiset is being used as a priority queue. Why are the priority queue and the multiset have such different performance, even with -O2
?
Average of ten results, MSVS 2010, Win XP, 32 bit, method findAllKNN2 () (see bellow, please)
MS
N time [s]
100 000 0.5
1 000 000 8
PQ
N time [s]
100 000 0.8
1 000 000 12
What could cause these results? No other changes of the source code have been made... Thanks for your help...
MS Implementation:
template <typename Point>
struct TKDNodePriority
{
KDNode <Point> *node;
typename Point::Type priority;
TKDNodePriority() : node ( NULL ), priority ( 0 ) {}
TKDNodePriority ( KDNode <Point> *node_, typename Point::Type priority_ ) : node ( node_ ), priority ( priority_ ) {}
bool operator < ( const TKDNodePriority <Point> &n1 ) const
{
return priority > n1.priority;
}
};
template <typename Point>
struct TNNeighboursList
{
typedef std::multiset < TKDNodePriority <Point> > Type;
};
Method:
template <typename Point>
template <typename Point2>
void KDTree2D <Point>::findAllKNN2 ( const Point2 * point, typename TNNeighboursList <Point>::Type & knn, unsigned int k, KDNode <Point> *node, const unsigned int depth ) const
{
if ( node == NULL )
{
return;
}
if ( point->getCoordinate ( depth % 2 ) <= node->getData()->getCoordinate ( depth % 2 ) )
{
findAllKNN2 ( point, knn, k, node->getLeft(), depth + 1 );
}
else
{
findAllKNN2 ( point, knn, k, node->getRight(), depth + 1 );
}
typename Point::Type dist_q_node = ( node->getData()->getX() - point->getX() ) * ( node->getData()->getX() - point->getX() ) +
( node->getData()->getY() - point->getY() ) * ( node->getData()->getY() - point->getY() );
if (knn.size() == k)
{
if (dist_q_node < knn.begin()->priority )
{
knn.erase(knn.begin());
knn.insert ( TKDNodePriority <Point> ( node, dist_q_node ) );
}
}
else
{
knn.insert ( TKDNodePriority <Point> ( node, dist_q_node ) );
}
typename Point::Type dist_q_node_straight = ( point->getCoordinate ( node->getDepth() % 2 ) - node->getData()->getCoordinate ( node->getDepth() % 2 ) ) *
( point->getCoordinate ( node->getDepth() % 2 ) - node->getData()->getCoordinate ( node->getDepth() % 2 ) ) ;
typename Point::Type top_priority = knn.begin()->priority;
if ( knn.size() < k || dist_q_node_straight < top_priority )
{
if ( point->getCoordinate ( node->getDepth() % 2 ) < node->getData()->getCoordinate ( node->getDepth() % 2 ) )
{
findAllKNN2 ( point, knn, k, node->getRight(), depth + 1 );
}
else
{
findAllKNN2 ( point, knn, k, node->getLeft(), depth + 1 );
}
}
}
PQ implementation (slower, why?)
template <typename Point>
struct TKDNodePriority
{
KDNode <Point> *node;
typename Point::Type priority;
TKDNodePriority() : node ( NULL ), priority ( 0 ) {}
TKDNodePriority ( KDNode <Point> *node_, typename Point::Type priority_ ) : node ( node_ ), priority ( priority_ ) {}
bool operator < ( const TKDNodePriority <Point> &n1 ) const
{
return priority > n1.priority;
}
};
template <typename Point>
struct TNNeighboursList
{
typedef std::priority_queue< TKDNodePriority <Point> > Type;
};
Method:
template <typename Point>
template <typename Point2>
void KDTree2D <Point>::findAllKNN2 ( const Point2 * point, typename TNNeighboursList <Point>::Type & knn, unsigned int k, KDNode <Point> *node, const unsigned int depth ) const
{
if ( node == NULL )
{
return;
}
if ( point->getCoordinate ( depth % 2 ) <= node->getData()->getCoordinate ( depth % 2 ) )
{
findAllKNN2 ( point, knn, k, node->getLeft(), depth + 1 );
}
else
{
findAllKNN2 ( point, knn, k, node->getRight(), depth + 1 );
}
typename Point::Type dist_q_node = ( node->getData()->getX() - point->getX() ) * ( node->getData()->getX() - point->getX() ) +
( node->getData()->getY() - point->getY() ) * ( node->getData()->getY() - point->getY() );
if (knn.size() == k)
{
if (dist_q_node < knn.top().priority )
{
knn.pop();
knn.push ( TKDNodePriority <Point> ( node, dist_q_node ) );
}
}
else
{
knn.push ( TKDNodePriority <Point> ( node, dist_q_node ) );
}
typename Point::Type dist_q_node_straight = ( point->getCoordinate ( node->getDepth() % 2 ) - node->getData()->getCoordinate ( node->getDepth() % 2 ) ) *
( point->getCoordinate ( node->getDepth() % 2 ) - node->getData()->getCoordinate ( node->getDepth() % 2 ) ) ;
typename Point::Type top_priority = knn.top().priority;
if ( knn.size() < k || dist_q_node_straight < top_priority )
{
if ( point->getCoordinate ( node->getDepth() % 2 ) < node->getData()->getCoordinate ( node->getDepth() % 2 ) )
{
findAllKNN2 ( point, knn, k, node->getRight(), depth + 1 );
}
else
{
findAllKNN2 ( point, knn, k, node->getLeft(), depth + 1 );
}
}
}
回答1:
It appears that your compiler's optimization settings may have a large impact on performance. In the code below, multiset handily beats vector-based and deque-based priority_queue without optimization. However, with "-O3" optimization, the vector-based priority queue beats all. Now, these experiments were run on Linux with GCC, so perhaps you would get different results on Windows. I believe enabling optimization may remove many error-checking behaviors in STL's vector.
Without Optimization:
pq-w-vector: 79.2997ms
pq-w-deque: 362.366ms
pq-w-multiset: 34.649ms
With -O2 Optimization:
pq-w-vector: 8.88154ms
pq-w-deque: 17.5233ms
pq-w-multiset: 12.5539ms
With -O3 Optimization:
pq-w-vector: 7.92462ms
pq-w-deque: 16.8028ms
pq-w-multiset: 12.3208ms
Test Harness (don't forget to link with -lrt):
#include <iostream>
#include <queue>
#include <deque>
#include <set>
#include <ctime>
#include <cstdlib>
#include <unistd.h>
using namespace std;
template <typename T>
double run_test(T& pq, int size, int iterations)
{
struct timespec start, end;
for(int i = 0; i < size; ++i)
pq.push(rand());
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &start);
for(int i = 0; i < iterations; ++i)
{
if(rand()%2)
pq.pop();
else
pq.push(rand());
}
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &end);
end.tv_sec -= start.tv_sec;
end.tv_nsec -= start.tv_nsec;
if (end.tv_nsec < 0)
{
--end.tv_sec;
end.tv_nsec += 1000000000ULL;
}
return (end.tv_sec*1e3 + end.tv_nsec/1e6);
}
template <class T>
class multiset_pq: public multiset<T>
{
public:
multiset_pq(): multiset<T>() {};
void push(T elm) { this->insert(elm); }
void pop() { if(!this->empty()) this->erase(this->begin()); }
const T& top() { return *this->begin(); }
};
int main(void)
{
const int size = 5000;
const int iterations = 100000;
priority_queue<int, vector<int> > pqv;
priority_queue<int, deque<int> > pqd;
multiset_pq<int> pqms;
srand(time(0));
cout<<"pq-w-vector: "<<run_test(pqv, size, iterations)<<"ms"<<endl;
cout<<"pq-w-deque: "<<run_test(pqd, size, iterations)<<"ms"<<endl;
cout<<"pq-w-multiset: "<<run_test(pqms, size, iterations)<<"ms"<<endl;
return 0;
}
回答2:
First of all, author didn't provide minimal example of code that leads to mentioned performance drop.
Second, the question was asked 8 years ago, I'm sure compilers made a huge boost on performance.
I've made a benchmark example where I take 1st element in the queue then push in back with another priority (simulating push of new element without creating one), doing that by count of elements in array kNodesCount
in a loop with kRunsCount
iterations. I'm comparing priority_queue
with multiset
and multimap
. I've decided include multimap
for more precise comparsion. It's simple test is very close to author use case, also I've tried to reproduce structs he used in the code samples.
#include <set>
#include <type_traits>
#include <vector>
#include <chrono>
#include <queue>
#include <map>
#include <iostream>
template<typename T>
struct Point {
static_assert(std::is_integral<T>::value || std::is_floating_point<T>::value, "Incompatible type");
using Type = T;
T x;
T y;
};
template<typename T>
struct Node {
using Type = T;
Node<T> * left;
Node<T> * right;
T data;
};
template <typename T>
struct NodePriority {
using Type = T;
using DataType = typename T::Type;
Node<T> * node = nullptr;
DataType priority = static_cast<DataType>(0);
bool operator < (const NodePriority<T> & n1) const noexcept {
return priority > n1.priority;
}
bool operator > (const NodePriority<T> & n1) const noexcept {
return priority < n1.priority;
}
};
// descending order by default
template <typename T>
using PriorityQueueList = std::priority_queue<T>;
// greater used because of ascending order by default
template <typename T>
using MultisetList = std::multiset<T, std::greater<T>>;
// greater used because of ascending order by default
template <typename T>
using MultimapList = std::multimap<typename T::DataType, T, std::greater<typename T::DataType>>;
struct Inner {
template<template <typename> class C, typename T>
static void Operate(C<T> & list, std::size_t priority);
template<typename T>
static void Operate(PriorityQueueList<T> & list, std::size_t priority) {
if (list.size() % 2 == 0) {
auto el = std::move(list.top());
el.priority = priority;
list.push(std::move(el));
}
else {
list.pop();
}
}
template<typename T>
static void Operate(MultisetList<T> & list, std::size_t priority) {
if (list.size() % 2 == 0) {
auto el = std::move(*list.begin());
el.priority = priority;
list.insert(std::move(el));
}
else {
list.erase(list.begin());
}
}
template<typename T>
static void Operate(MultimapList<T> & list, std::size_t priority) {
if (list.size() % 2 == 0) {
auto el = std::move(*list.begin());
auto & elFirst = const_cast<int&>(el.first);
elFirst = priority;
el.second.priority = priority;
list.insert(std::move(el));
}
else {
list.erase(list.begin());
}
}
};
template<typename T>
void doOperationOnPriorityList(T & list) {
for (std::size_t pos = 0, len = list.size(); pos < len; ++pos) {
// move top element and update priority
auto priority = std::rand() % 10;
Inner::Operate(list, priority);
}
}
template<typename T>
void measureOperationTime(T & list, std::size_t runsCount) {
std::chrono::system_clock::time_point t1, t2;
std::uint64_t totalTime(0);
for (std::size_t i = 0; i < runsCount; ++i) {
t1 = std::chrono::system_clock::now();
doOperationOnPriorityList(list);
t2 = std::chrono::system_clock::now();
auto castedTime = std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1).count();
std::cout << "Run " << i << " time: " << castedTime << "\n";
totalTime += castedTime;
}
std::cout << "Average time is: " << totalTime / runsCount << " ms" << std::endl;
}
int main() {
// consts
const int kNodesCount = 10'000'000;
const int kRunsCount = 10;
// prepare data
PriorityQueueList<NodePriority<Point<int>>> neighboursList1;
MultisetList<NodePriority<Point<int>>> neighboursList2;
MultimapList<NodePriority<Point<int>>> neighboursList3;
std::vector<Node<Point<int>>> nodes;
nodes.reserve(kNodesCount);
for (auto i = 0; i < kNodesCount; ++i) {
nodes.emplace_back(decltype(nodes)::value_type{ nullptr, nullptr, { 0,0 } });
auto priority = std::rand() % 10;
neighboursList1.emplace(decltype(neighboursList1)::value_type{ &nodes.back(), priority });
neighboursList2.emplace(decltype(neighboursList2)::value_type{ &nodes.back(), priority });
neighboursList3.emplace(decltype(neighboursList3)::value_type{ priority, { &nodes.back(), priority } });
}
// do operation on data
std::cout << "\nPriority queue\n";
measureOperationTime(neighboursList1, kRunsCount);
std::cout << "\nMultiset\n";
measureOperationTime(neighboursList2, kRunsCount);
std::cout << "\nMultimap\n";
measureOperationTime(neighboursList3, kRunsCount);
return 0;
}
I've made release build with /Ox using VS v15.8.9. Take a look on results for 10'000'000 items in 10 runs:
Priority queue
Run 0 time: 764
Run 1 time: 933
Run 2 time: 920
Run 3 time: 813
Run 4 time: 991
Run 5 time: 862
Run 6 time: 902
Run 7 time: 1277
Run 8 time: 774
Run 9 time: 771
Average time is: 900 ms
Multiset
Run 0 time: 2235
Run 1 time: 1811
Run 2 time: 1755
Run 3 time: 1535
Run 4 time: 1475
Run 5 time: 1388
Run 6 time: 1482
Run 7 time: 1431
Run 8 time: 1347
Run 9 time: 1347
Average time is: 1580 ms
Multimap
Run 0 time: 2197
Run 1 time: 1885
Run 2 time: 1725
Run 3 time: 1671
Run 4 time: 1500
Run 5 time: 1403
Run 6 time: 1411
Run 7 time: 1420
Run 8 time: 1409
Run 9 time: 1362
Average time is: 1598 ms
Hmhmh, as you see multiset
is just the same performance as multimap
and priority_queue
is the most fastest (around 43% faster). So why is that happen?
Let's start from priority_queue
, C++ standard doesn't tell us how to implement one or another container or structure, but in most cases it's based on a binary heap (look for msvc and gcc implementation)! In case of priority_queue
you have no access to any element except top, you can't iterate through them, get by index, or even take last element (it makes some space for optimization). Average insert for binary heap is O(1) and only the worst case is O(log n) and deletion is O(log n) since we taking element from the bottom then searching next high priority.
What about multimap
and multiset
. They both usually implemented on red-black binary tree (look for msvc and gcc implementation), where average insert is O(log n) and deletion O(log n) either.
From this point of view priority_queue
NEVER can be slower of multiset
or multimap
. So, back to your question, multiset
as priority queue is NOT faster than priority_queue
itself. There might be plenty of reasons, including raw priority_queue
implementation on old compiler or wrong usage of this structure (the question doesn't contain minimal workable example), besides author did't mentioned compile flags or compiler version, sometimes optimization makes significant changes.
UPDATE 1 upon @noɥʇʎԀʎzɐɹƆ request
Unfortunately I don't have access to linux environment right now, but I have mingw-w64 installed, version info: g++.exe (x86_64-posix-seh, Built by strawberryperl.com project) 8.3.0. Used processor just the same as for visual studio: Processor Intel(R) Core(TM) i7-8550U CPU @ 1.80GHz, 2001 Mhz, 4 Core(s), 8 Logical Processor(s).
So results for g++ -O2
is:
Priority queue
Run 0 time: 775
Run 1 time: 995
Run 2 time: 901
Run 3 time: 807
Run 4 time: 930
Run 5 time: 765
Run 6 time: 799
Run 7 time: 1151
Run 8 time: 760
Run 9 time: 780
Average time is: 866 ms
Multiset
Run 0 time: 2280
Run 1 time: 1942
Run 2 time: 1607
Run 3 time: 1344
Run 4 time: 1319
Run 5 time: 1210
Run 6 time: 1129
Run 7 time: 1156
Run 8 time: 1244
Run 9 time: 992
Average time is: 1422 ms
Multimap
Run 0 time: 2530
Run 1 time: 1958
Run 2 time: 1670
Run 3 time: 1390
Run 4 time: 1391
Run 5 time: 1235
Run 6 time: 1088
Run 7 time: 1198
Run 8 time: 1071
Run 9 time: 963
Average time is: 1449 ms
You may notice it's almost the same picture as for msvc.
UPDATE 2 thanks to @JorgeBellon
A quick-bench.com online benchmark link, check it yourself!
Would like to see any additions to my post, cheers!
回答3:
The implementation of the priority_queue
is the culprit, according to what I understand. priority_queue
's are implemented (underneath) as a specialized vector
or deque
. Because priority_queue needs to have random-access iterators. When you pop or push items into the priority_queue
, remaining items in the queue needs to be copied to the empty space and the same happens with insertion. multi_set
is based on keys.
EDIT: Terribly sorry, multi_set is not based on hash key. I confused it with multi_map for some reason. But multi_set is a multiple sorted associative container and stores elements based on the key, which is same as the value. Due to the way elements are stored in a multi_set
, it
...has the important property that
inserting a new element into a
multi_set
does not invalidate
iterators that point to existing
elements. Erasing an element from a
multi_set
also does not invalidate
any iterators, except, of course, for
iterators that actually point to the
element that is being erased.
-- Quoted from SGI documentation.
This means that storage of a multi_set
is not linear and hence the performance gain.
回答4:
This non-synthetic benchmark is taken from an actual usage of priority_queue. Feed this file in as stdin to run the benchmark.
// TOPOSORT 2
// This short function computes the lexicographically smallest toposort.
// priority_queue vs multiset benchmark
#include <vector>
#include <queue>
#include <set>
#include <unordered_set>
#include <ctime>
#include <chrono>
#include <iostream>
// https://stackoverflow.com/a/13772771/1459669
#ifdef _WIN32
#include <intrin.h>
#else
#include <x86intrin.h>
#endif
using namespace std;
constexpr int MAXN = 100001;
struct Tail {
int observation, number;
};
typedef vector<vector<Tail>> AdjList;
int N, M;
void computeNumIncomingEdges(int observationID, AdjList adjacency_list,
int *numIncomingEdges) {
for (int node = 0; node <= N; ++node) {
numIncomingEdges[node] = 0;
}
for (int node = 1; node <= N; ++node) {
for (Tail tail : adjacency_list[node]) {
if (tail.observation <= observationID) {
numIncomingEdges[tail.number]++;
}
}
}
}
template<class T>
vector<int> toposort2_PQ(int observationID, AdjList adjacency_list) {
vector<int> sortedElements;
priority_queue<int, T, std::greater<int>> S;
static int numIncomingEdges[MAXN];
computeNumIncomingEdges(observationID, adjacency_list, numIncomingEdges);
for (int node = 1; node <= N; ++node) {
if (numIncomingEdges[node] == 0)
S.push(node);
}
while (!S.empty()) {
auto n = S.top();
S.pop();
sortedElements.push_back(n);
for (int _ = adjacency_list[n].size() - 1; _ >= 0; --_) {
Tail m = adjacency_list[n][_];
if (m.observation <= observationID) {
adjacency_list[n].pop_back();
numIncomingEdges[m.number]--;
if (numIncomingEdges[m.number] == 0)
S.push(m.number);
}
}
}
bool graphStillHasEdges = false;
for (int node = 1; node <= N; ++node)
if (numIncomingEdges[node] > 0) {
graphStillHasEdges = true;
break;
}
return sortedElements;
}
vector<int> toposort2_multiset(int observationID, AdjList adjacency_list) {
vector<int> sortedElements;
multiset<int, std::greater<int>> S;
static int numIncomingEdges[MAXN];
computeNumIncomingEdges(observationID, adjacency_list, numIncomingEdges);
for (int node = 1; node <= N; ++node) {
if (numIncomingEdges[node] == 0)
S.insert(node);
}
while (!S.empty()) {
int n = *S.begin();
S.erase(S.begin());
sortedElements.push_back(n);
for (int _ = adjacency_list[n].size() - 1; _ >= 0; --_) {
Tail m = adjacency_list[n][_];
if (m.observation <= observationID) {
adjacency_list[n].pop_back();
numIncomingEdges[m.number]--;
if (numIncomingEdges[m.number] == 0)
S.insert(m.number);
}
}
}
bool graphStillHasEdges = false;
for (int node = 1; node <= N; ++node)
if (numIncomingEdges[node] > 0) {
graphStillHasEdges = true;
break;
}
return sortedElements;
}
int main() {
scanf("%d %d", &N, &M);
AdjList adjacency_list(MAXN);
for (int observation = 0; observation < M; ++observation) {
int observationSize;
scanf("%d", &observationSize);
int head;
scanf("%d", &head);
for (int i = 0; i < observationSize - 1; ++i) {
int tail;
scanf("%d", &tail);
Tail to_insert;
to_insert.observation = observation;
to_insert.number = tail;
adjacency_list[head].push_back(to_insert);
head = tail;
}
}
for (int i = 0; i < 5; ++i) {
auto start_pq = std::chrono::high_resolution_clock::now();
toposort2_PQ<vector<int>>(3182, adjacency_list);
auto end_pq = std::chrono::high_resolution_clock::now();
auto start_pq_dq = std::chrono::high_resolution_clock::now();
toposort2_PQ<deque<int>>(3182, adjacency_list);
auto end_pq_dq = std::chrono::high_resolution_clock::now();
auto start_ms = std::chrono::high_resolution_clock::now();
toposort2_multiset(3182, adjacency_list);
auto end_ms = std::chrono::high_resolution_clock::now();
std::cout << std::chrono::duration_cast<std::chrono::microseconds>(end_pq-start_pq).count() << ' ' << std::chrono::duration_cast<std::chrono::microseconds>(end_pq_dq-start_pq_dq).count() << ' ' << std::chrono::duration_cast<std::chrono::microseconds>(end_ms-start_ms).count() << endl;
}
}
Using clang++ with -O2 gets me:
31622 37891 54884
27092 33919 54878
27324 35870 51427
27961 35348 53170
26746 34753 54191
In summary, priority_queue with vector consistently wins. In second place is priority_queue with deque, and in last is a multiset.