Iteration performance

2019-06-07 14:57发布

I made a function to evaluate the following problem experimentally, taken from a A Primer for the Mathematics of Financial Engineering.

Problem: Let X be the number of times you must flip a fair coin until it lands heads. What are E[X] (expected value) and var(X) (variance)?

Following the textbook solution, the following code yields the correct answer:

from sympy import *

k = symbols('k')

Expected_Value = summation(k/2**k, (k, 1, oo)) # Both solutions work

Variance = summation(k**2/2**k, (k, 1, oo)) - Expected_Value**2

To validate this answer, I decided to have a go at making a function to simulate this experiment. The following code is what I came up with.

def coin_toss(toss, probability=[0.5, 0.5]): 

    """Computes expected value and variance for coin toss experiment"""

    flips = [] # Collects total number of throws until heads appear per experiment.

    for _ in range(toss): # Simulate n flips

        number_flips=[] # Number of flips until heads is tossed  

        while sum(number_flips) == 0: # Continue simulation while Tails are thrown 

            number_flips.append(np.random.choice(2, p=probability)) # Append result to number_flips

        flips.append(len(number_flips)) #Append number of flips until lands heads to flips

    Expected_Value, Variance  = np.mean(flips), np.var(flips)    

    print('E[X]: {}'.format(Expected_Value), 
          '\nvar[X]: {}'.format(Variance)) # Return expected value 

The run time if I simulate 1e6 experiments, using the following code is approximately 35.9 seconds.

   from timeit import Timer

   t1 = Timer("""coin_toss(1000000)""", """from __main__ import coin_toss""")

   print(t1.timeit(1))

In the interest of developing my understanding of Python, is this a particularly efficient/pythonic way of approaching a problem like this? How can I utilise existing libraries to improve efficiency/flow execution?

1条回答
2楼-- · 2019-06-07 15:14

In order to code in an efficient and pythonic way, you must take a look at PythonSpeed and NumPy. One exemple of a faster code using numpy can be found below.

The abc of optimizing in python+numpy is to vectorize operations, which in this case is quite dificult because there is a while that could actually be infinite, the coin can be flipped tails 40 times in a row. However, instead of doing a for with toss iterations, the work can be done in chunks. That is the main difference between coin_toss from the question and this coin_toss_2d approach.

coin_toss_2d

The main advantatge of coin_toss_2d is working by chunks, the size of these chunks has some default values, but they can be modified (as they will affect speed). Thus, it will only iterate over the while current_toss<toss a number of times toss%flips_at_a_time. This is achieved with numpy, which allows to generate a matrix with the results of repeating flips_at_a_time times the experiment of flipping a coin flips_per_try times. This matrix will contain 0 (tails) and 1 (heads).

# i.e. doing only 5 repetitions with 3 flips_at_a_time
flip_events = np.random.choice([0,1],size=(repetitions_at_a_time,flips_per_try),p=probability)
# Out 
[[0 0 0] # still no head, we will have to keep trying
 [0 1 1] # head at the 2nd try (position 1 in python)
 [1 0 0]
 [1 0 1]
 [0 0 1]]

Once this result is obtained, argmax is called. This finds the index corresponding to the maximum (which will be 1, a head) of each row (repetition) and in case of multiple occurences, returns the first one, which is exactly what is needed, the first head after a sequence of tails.

maxs = flip_events.argmax(axis=1)
# Out
[0 1 0 0 2] 
# The first position is 0, however, flip_events[0,0]!=1, it's not a head!

However, the case where all the row is 0 must be considered. In this case, the maximum will be 0, and its first occurence will also be 0, the first column (try). Therefore, we check that all the maximums found at the first try correspond to a head at the first try.

not_finished = (maxs==0) & (flip_events[:,0]!=1)
# Out
[ True False False False False] # first repetition is not finished

If that is not the case, we loop repeating that same process but only for the repetitions where there was no head in any of the tries.

n = np.sum(not_finished)
while n!=0: # while there are sequences without any head
    flip_events = np.random.choice([0,1],size=(n,flips_per_try),p=probability) # number of experiments reduced to n (the number of all tails sequences)
    maxs2 = flip_events.argmax(axis=1)
    maxs[not_finished] += maxs2+flips_per_try # take into account that there have been flips_per_try tries already (every iteration is added)
    not_finished2 = (maxs2==0) & (flip_events[:,0]!=1) 
    not_finished[not_finished] = not_finished2
    n = np.sum(not_finished)
# Out
# flip_events
[[1 0 1]] # Now there is a head
# maxs2
[0]
# maxs
[3 1 0 0 2] # The value of the still unfinished repetition has been updated,
# taking into account that the first position in flip_events is the 4th,
# without affecting the rest

Then the indexes corresponding to the first head occurence are stored (we have to add 1 because python indexing starts at zero instead of 1). There is one try ... except ... block to cope with cases where toss is not a multiple of repetitions_at_a_time.

def coin_toss_2d(toss, probability=[.5,.5],repetitions_at_a_time=10**5,flips_per_try=20):
    # Initialize and preallocate data
    current_toss = 0
    flips = np.empty(toss)
    # loop by chunks
    while current_toss<toss:
        # repeat repetitions_at_a_time times experiment "flip coin flips_per_try times"
        flip_events = np.random.choice([0,1],size=(repetitions_at_a_time,flips_per_try),p=probability)
        # store first head ocurrence
        maxs = flip_events.argmax(axis=1)
        # Check for all tails sequences, that is, repetitions were we have to keep trying to get a head
        not_finished = (maxs==0) & (flip_events[:,0]!=1)
        n = np.sum(not_finished)
        while n!=0: # while there are sequences without any head
            flip_events = np.random.choice([0,1],size=(n,flips_per_try),p=probability) # number of experiments reduced to n (the number of all tails sequences)
            maxs2 = flip_events.argmax(axis=1)
            maxs[not_finished] += maxs2+flips_per_try # take into account that there have been flips_per_try tries already (every iteration is added)
            not_finished2 = (maxs2==0) & (flip_events[:,0]!=1) 
            not_finished[not_finished] = not_finished2
            n = np.sum(not_finished)
        # try except in case toss is not multiple of repetitions_at_a_time, in general, no error is raised, that is why a try is useful
        try:
            flips[current_toss:current_toss+repetitions_at_a_time] = maxs+1
        except ValueError:
            flips[current_toss:] = maxs[:toss-current_toss]+1
        # Update current_toss and move to the next chunk
        current_toss += repetitions_at_a_time
    # Once all values are obtained, average and return them
    Expected_Value, Variance  = np.mean(flips), np.var(flips)    
    return Expected_Value, Variance

coin_toss_map

Here the code is basically the same, but now, the intrinsec while is done in a separate function, which is called from the wrapper function coin_toss_map using map.

def toss_chunk(args):
    probability,repetitions_at_a_time,flips_per_try = args
    # repeat repetitions_at_a_time times experiment "flip coin flips_per_try times"
    flip_events = np.random.choice([0,1],size=(repetitions_at_a_time,flips_per_try),p=probability)
    # store first head ocurrence
    maxs = flip_events.argmax(axis=1)
    # Check for all tails sequences
    not_finished = (maxs==0) & (flip_events[:,0]!=1)
    n = np.sum(not_finished)
    while n!=0: # while there are sequences without any head
        flip_events = np.random.choice([0,1],size=(n,flips_per_try),p=probability) # number of experiments reduced to n (the number of all tails sequences)
        maxs2 = flip_events.argmax(axis=1)
        maxs[not_finished] += maxs2+flips_per_try # take into account that there have been flips_per_try tries already (every iteration is added)
        not_finished2 = (maxs2==0) & (flip_events[:,0]!=1) 
        not_finished[not_finished] = not_finished2
        n = np.sum(not_finished)
    return maxs+1
def coin_toss_map(toss,probability=[.5,.5],repetitions_at_a_time=10**5,flips_per_try=20):
    n_chunks, remainder = divmod(toss,repetitions_at_a_time)
    args = [(probability,repetitions_at_a_time,flips_per_try) for _ in range(n_chunks)]
    if remainder:
        args.append((probability,remainder,flips_per_try))
    flips = np.concatenate(map(toss_chunk,args))
    # Once all values are obtained, average and return them
    Expected_Value, Variance  = np.mean(flips), np.var(flips)    
    return Expected_Value, Variance

Performance comparison

In my computer, I got the following computation time:

In [1]: %timeit coin_toss(10**6)
# Out 
# ('E[X]: 2.000287', '\nvar[X]: 1.99791891763')
# ('E[X]: 2.000459', '\nvar[X]: 2.00692478932')
# ('E[X]: 1.998118', '\nvar[X]: 1.98881045808')
# ('E[X]: 1.9987', '\nvar[X]: 1.99508631')
# 1 loop, best of 3: 46.2 s per loop

In [2]: %timeit coin_toss_2d(10**6,repetitions_at_a_time=5*10**5,flips_per_try=4)
# Out
# 1 loop, best of 3: 197 ms per loop

In [3]: %timeit coin_toss_map(10**6,repetitions_at_a_time=4*10**5,flips_per_try=4)
# Out
# 1 loop, best of 3: 192 ms per loop

And the results for the mean and variance are:

In [4]: [coin_toss_2d(10**6,repetitions_at_a_time=10**5,flips_per_try=10) for _ in range(4)]
# Out
# [(1.999848, 1.9990739768960009),
#  (2.000654, 2.0046035722839997),
#  (1.999835, 2.0072329727749993),
#  (1.999277, 2.001566477271)]

In [4]: [coin_toss_map(10**6,repetitions_at_a_time=10**5,flips_per_try=4) for _ in range(4)]
# Out
# [(1.999552, 2.0005057992959996),
#  (2.001733, 2.011159996711001),
#  (2.002308, 2.012128673136001),
#  (2.000738, 2.003613455356)]
查看更多
登录 后发表回答