How to speed up monte carlo simulation in python

2019-09-19 08:20发布

问题:

I have written a poker simulator that calculates the probability to win in texas holdem by running a simulation of games (i.e. monte carlo simulation). Currently it runs around 10000 simulations in 10 seconds which in general is good enough. However, there are apps on the iPhone that are running around 100x faster. I'm wondering whether there's anything I can do to speed up the program significantly. I have already replaced strings with lists and found many ways to speed up the program by around 2-3 times. But what can I do to possibly speed it up 50x-100x? I checked with a profiler but couldn't find any significant bottleneck. I also complied it to cython (without making any changes) but that had no effect on speed either. Any suggestions are appreciated. The full listing is below:

__author__ = 'Nicolas Dickreuter'
import time
import numpy as np
from collections import Counter

class MonteCarlo(object):

    def EvalBestHand(self, hands):
        scores = [(i, self.score(hand)) for i, hand in enumerate(hands)]
        winner = sorted(scores, key=lambda x: x[1], reverse=True)[0][0]
        return hands[winner],scores[winner][1][-1]

    def score(self, hand):
        crdRanksOriginal = '23456789TJQKA'
        originalSuits='CDHS'
        rcounts = {crdRanksOriginal.find(r): ''.join(hand).count(r) for r, _ in hand}.items()
        score, crdRanks = zip(*sorted((cnt, rank) for rank, cnt in rcounts)[::-1])

        potentialThreeOfAKind = score[0] == 3
        potentialTwoPair = score == (2, 2, 1, 1, 1)
        potentialPair = score == (2, 1, 1, 1, 1, 1)

        if score[0:2]==(3,2) or score[0:2]==(3,3): # fullhouse (three of a kind and pair, or two three of a kind)
            crdRanks = (crdRanks[0],crdRanks[1])
            score = (3,2)
        elif score[0:4]==(2,2,2,1):
            score=(2,2,1) # three pair are not worth more than two pair
            sortedCrdRanks = sorted(crdRanks,reverse=True) # avoid for example 11,8,6,7
            crdRanks=(sortedCrdRanks[0],sortedCrdRanks[1],sortedCrdRanks[2],sortedCrdRanks[3])

        elif len(score) >= 5:  # high card, flush, straight and straight flush
            # straight
            if 12 in crdRanks:  # adjust if 5 high straight
                crdRanks += (-1,)
            sortedCrdRanks = sorted(crdRanks,reverse=True)  # sort again as if pairs the first rank matches the pair
            for i in range(len(sortedCrdRanks) - 4):
                straight = sortedCrdRanks[i] - sortedCrdRanks[i + 4] == 4
                if straight:
                    crdRanks=(sortedCrdRanks[i],sortedCrdRanks[i+1],sortedCrdRanks[i+2],sortedCrdRanks[i+3],sortedCrdRanks[i+4])
                    break

            # flush
            suits = [s for _, s in hand]
            flush = max(suits.count(s) for s in suits) >= 5
            if flush:
                for flushSuit in originalSuits: # get the suit of the flush
                    if suits.count(flushSuit)>=5:
                        break

                flushHand = [k for k in hand if flushSuit in k]
                rcountsFlush = {crdRanksOriginal.find(r): ''.join(flushHand).count(r) for r, _ in flushHand}.items()
                score, crdRanks = zip(*sorted((cnt, rank) for rank, cnt in rcountsFlush)[::-1])
                crdRanks = tuple(sorted(crdRanks,reverse=True)) # ignore original sorting where pairs had influence

                # check for straight in flush
                if 12 in crdRanks:  # adjust if 5 high straight
                    crdRanks += (-1,)
                for i in range(len(crdRanks) - 4):
                    straight = crdRanks[i] - crdRanks[i + 4] == 4

            # no pair, straight, flush, or straight flush
            score = ([(5,), (2, 1, 2)], [(3, 1, 3), (5,)])[flush][straight]

        if score == (1,) and potentialThreeOfAKind: score = (3, 1)
        elif score == (1,) and potentialTwoPair: score = (2, 2, 1)
        elif score == (1,) and potentialPair: score = (2, 1, 1)

        if score[0]==5:
            handType="StraightFlush"
            #crdRanks=crdRanks[:5] # five card rule makes no difference {:5] would be incorrect
        elif score[0]==4:
            handType="FoufOfAKind"
            crdRanks=crdRanks[:2]
        elif score[0:2]==(3,2):
            handType="FullHouse"
            # crdRanks=crdRanks[:2] # already implmeneted above
        elif score[0:3]==(3,1,3):
            handType="Flush"
            crdRanks=crdRanks[:5] # !! to be verified !!
        elif score[0:3]==(3,1,2):
            handType="Straight"
            crdRanks=crdRanks[:5] # !! to be verified !!
        elif score[0:2]==(3,1):
            handType="ThreeOfAKind"
            crdRanks=crdRanks[:3]
        elif score[0:2]==(2,2):
            handType="TwoPair"
            crdRanks=crdRanks[:3]
        elif score[0]==2:
            handType="Pair"
            crdRanks=crdRanks[:4]
        elif score[0]==1:
            handType="HighCard"
            crdRanks=crdRanks[:5]
        else: raise Exception('Card Type error!')

        return score, crdRanks, handType

    def createCardDeck(self):
        values = "23456789TJQKA"
        suites = "CDHS"
        Deck=[]
        [Deck.append(x+y) for x in values for y in suites]
        return Deck

    def distributeToPlayers(self, Deck, PlayerAmount, PlayerCardList, TableCardsList):
        Players =[]
        CardsOnTable = []
        knownPlayers = 0

        for PlayerCards in PlayerCardList:
            FirstPlayer=[]
            FirstPlayer.append(Deck.pop(Deck.index(PlayerCards[0])))
            FirstPlayer.append(Deck.pop(Deck.index(PlayerCards[1])))
            Players.append(FirstPlayer)

            knownPlayers += 1

        for c in TableCardsList:
            CardsOnTable.append(Deck.pop(Deck.index(c)))  # remove cards that are on the table from the deck

        for n in range(0, PlayerAmount - knownPlayers):
            plr=[]
            plr.append(Deck.pop(np.random.random_integers(0,len(Deck)-1)))
            plr.append(Deck.pop(np.random.random_integers(0,len(Deck)-1)))
            Players.append(plr)

        return Players, Deck

    def distributeToTable(self, Deck, TableCardsList):
        remaningRandoms = 5 - len(TableCardsList)
        for n in range(0, remaningRandoms):
            TableCardsList.append(Deck.pop(np.random.random_integers(0,len(Deck)-1)))

        return TableCardsList

    def RunMonteCarlo(self, originalPlayerCardList, originalTableCardsList, PlayerAmount, gui, maxRuns=6000,maxSecs=5):
        winnerlist = []
        EquityList = []
        winnerCardTypeList=[]
        wins = 0
        runs=0
        timeout_start=time.time()
        for m in range(maxRuns):
            runs+=1
            try:
                if gui.active==True:
                    gui.progress["value"] = int(round(m*100/maxRuns))
            except:
                pass
            Deck = self.createCardDeck()
            PlayerCardList = originalPlayerCardList[:]
            TableCardsList = originalTableCardsList[:]
            Players, Deck = self.distributeToPlayers(Deck, PlayerAmount, PlayerCardList, TableCardsList)
            Deck5Cards = self.distributeToTable(Deck, TableCardsList)
            PlayerFinalCardsWithTableCards = []
            for o in range(0, PlayerAmount):
                PlayerFinalCardsWithTableCards.append(Players[o]+Deck5Cards)

            bestHand,winnerCardType=self.EvalBestHand(PlayerFinalCardsWithTableCards)
            winner = (PlayerFinalCardsWithTableCards.index(bestHand))


            #print (winnerCardType)

            CollusionPlayers = 0
            if winner < CollusionPlayers + 1:
                wins += 1
                winnerCardTypeList.append(winnerCardType)
                # winnerlist.append(winner)
                # self.equity=wins/m
                # if self.equity>0.99: self.equity=0.99
                # EquityList.append(self.equity)
            if time.time()>timeout_start+maxSecs:
                break

        self.equity = wins / runs
        self.winnerCardTypeList = Counter(winnerCardTypeList)
        for key, value in self.winnerCardTypeList.items():
            self.winnerCardTypeList[key] = value / runs

        self.winTypesDict=self.winnerCardTypeList.items()

        # show how the montecarlo converges
        # xaxis=range(500,monteCarloRuns)
        # plt.plot(xaxis,EquityList[499:monteCarloRuns])
        # plt.show()
        return self.equity,self.winTypesDict

if __name__ == '__main__':
    Simulation = MonteCarlo()
    mycards=[['AS', 'KS']]
    cardsOnTable = []
    players = 3
    start_time = time.time()
    Simulation.RunMonteCarlo(mycards, cardsOnTable, players, 1, maxRuns=200000, maxSecs=120)
    print("--- %s seconds ---" % (time.time() - start_time))
    equity = Simulation.equity # considering draws as wins
    print (equity)

回答1:

Inefficient array operations are probably causing most of the slowness. In particular, I see a lot of lines like this:

Deck.pop(some_index)

Every time you do that, the list has to shift all the elements after that index. You can eliminate this (as well as repeated random functions) by using:

from random import shuffle

# Setup
originalDeck = createCardDeck()

# Individual run
Deck = originalDeck[:]
shuffle(Deck) # shuffles in place

# Draw a card from the end so nothing needs to shift
card = Deck.pop()

# Concise and efficient!

There are some other things you could do, like change long if-else chains into constant time lookups with a dict or custom class. Right now it checks for a straight flush first down to a high card last for every hand, which seems rather inefficient. I doubt that it would make a big difference though.



回答2:

This algorithm is too long and complex for any sensible suggestion apart from generic ones. So here you go: vectorise everything you can and work with vectors of data instead of loops, lists, insertions, removals. Montecarlo simulations allow for this quite nicely because all samples are independent.

For example, you want 1 million retries, generate a numpy array of random 1 million values in one line. You want to test outcome, place 1 million outcomes into a numpy array and test all of them at once against a given condition generating a numpy array of booleans.

If you manage to vectorize this you will get tremendous performance increases.