Imagine you’re running a sports competition with multiple competitions going on and you need to keep track of the top 10 fastest scores across all disciplines. As each athlete finishes competing in one or more games they want to know what their spot on the leaderboard is.
What’s the fastest way to compute this across a range of competitions?
Given a n
x m
matrix like you can see below where the rows are the disciplines and columns the top 10 spots, figure out where player p
ranks in all disciplines based on their times. The existing leaderboard is sorted in ascending order, i.e. the fastest is first and the slowest last.
Here’s an example:
We have this leaderboard with the top 10 ranks across two disciplines.
1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | |
---|---|---|---|---|---|---|---|---|---|---|
400m sprint | 30 | 32 | 33 | 36 | 38 | 40 | 41 | 44 | 46 | 48 |
800m sprint | 60 | 65 | 66 | 68 | 70 | 73 | 75 | 80 | 85 | 90 |
Now Player p
has these times:
time | |
---|---|
400m sprint | 36 |
800m sprint | 74 |
Based on these descriptions, the expected result would be 4 and 7.
Let’s see how we can approach this problem and implement progressively faster solutions for it.
Validating Correctness
In order to validate and benchmark solutions, we need to begin by creating a function that validates the correctnes of the implementation because benchmarking wrong solution won’t tell us much.
import typing
import numpy as np
def test_implementation(callable: typing.Callable[[np.ndarray, np.ndarray], np.ndarray]):
# Arrange
current_leaderboard = np.array(
[
[30, 32, 33, 36, 38, 40, 41, 44, 46, 48],
[60, 65, 66, 68, 70, 73, 75, 80, 85, 90],
]
)
p_times = np.array([36, 74])
expected_result = np.array([4, 7])
# Act
actual_result = callable(current_leaderboard, p_times)
# Assert
assert np.array_equal(expected_result, actual_result)
This function expects an implementation in the form of a callable to be passed into it and will then check that it correctly solves the example we mentioned above.
Benchmarking
In order to find out how fast a given implementation is, we need to be able to generate sample data of different sizes, so I wrote a function for that:
def generate_sample_data(n_disciplines, top_n) -> np.ndarray:
np.random.seed(42)
all_disciplines = []
for _ in range(n_disciplines):
times = np.random.randint(1, 100, top_n)
times.sort()
all_disciplines.append(times)
leaderboard = np.array(all_disciplines)
p_times = np.random.randint(1, 120, n_disciplines)
return leaderboard, p_times
Here, generate_sample_data
expects the shape of the leaderboard as parameters and returns a random leaderboard as well as an array for player p
with times that lie either within or outside of the current leaderboard. Note that I’m setting a static seed for the randomizer so we’ll get the same matrix when we call the function with the same parameters again.
Next, we need a function to do the actual benchmark:
from timeit import timeit
def benchmark(callable: typing.Callable[[np.ndarray, np.ndarray], np.ndarray]):
timeit_repeats = 10_000
disciplines = 10_000
leaderboard_spots = 30
test_implementation(callable)
leaderboard, p_times = generate_sample_data(disciplines, leaderboard_spots)
runtime_in_s = timeit("callable(leaderboard, p_times)", number=timeit_repeats, globals=locals())
print(f"{callable.__name__} took {runtime_in_s:.04f} seconds to determine the position in a top {leaderboard_spots} leaderboard for {disciplines} disciplines {timeit_repeats} times")
First, the function tests that the given implementation produces valid results, then it creates sample data and lastly it uses the timeit
function to run the code 10.000 times and measures how long that takes.
Now that we have the means to test and benchmark, let’s begin writing some code.
Different Implementations
Naive Python
We’re starting out with this purely Python-based implementation:
import bisect
def naive_python(leaderboard: np.ndarray, p_times: np.ndarray) -> np.ndarray:
spots = []
for idx, discipline_leaderboard in enumerate(leaderboard):
p_time = p_times[idx]
spot = bisect.bisect_left(discipline_leaderboard, p_time) + 1
spots.append(spot)
return np.array(spots)
When I say purely Python, I mean that it sticks to the standard library aside from the input and output, which are numpy arrays according to the interface we specified for the function. This implementation uses bisect.bisect_left
to quickly find the insertion point for an item in a sorted list.
If we run benchmark(naive_python)
it gives us this output:
naive_python took 30.8476 seconds to determine the position
in a top 30 leaderboard for 10000 disciplines 10000 times
About 30 seconds to solve our problem 10k times isn’t too bad, but we can do better. Part of the problem is that we have to iterate through all the disciplines here and solve this sequentially, which becomes a bottleneck as the array dimensions get larger. The other issue is the conversion overhead between numpy and Python.
The key to making implementations faster is often to find a different approach to the problem that performs less work or supports more parallelization. So far this implementation has used the fact that we defined the leaderboard to be sorted when the function receives it. This means an intuitive approach is to go through each discipline and find the first element that’s larger than the one we want to insert. The binary-search based approach of bisect makes this quite fast as well. Unfortunately we can’t parallelize it easily as that tends to add significant overhead in Python.
A different way of looking at this problem is trying to understand what finding our position in the leaderboard entails. The position of a value in the leaderboard is equal to the number of items in the leaderboard smaller than itself. The neat thing is that the leaderboard doesn’t even have to be sorted for this approach to give a correct result.
Numpy v1
Let’s see how we can use this. My first approach was to subtract the times in p
from each position in the leaderboard. Any negative value would automatically mean that the value is lower than the value in p
and any positive value would be higher.
Getting p
into the right shape was a bit annoying, but the np.tile
method allowed me to do it.
After the matrices had the same shape I could subtract the values and set any positive numbers to 0 and the negative ones to 1.
Summing up the values in each row (sum with axis=1) gives us the 0-based position in each discipline so we add 1 to get the correct rank.
def numpy_v1(leaderboard: np.ndarray, p_times: np.ndarray) -> np.ndarray:
p_times_tiled = np.tile(p_times, (leaderboard.shape[1], 1)).T
difference_between_spots = leaderboard - p_times_tiled
difference_between_spots[difference_between_spots > 0] = 0
difference_between_spots[difference_between_spots < 0] = 1
return np.sum(difference_between_spots, axis=1) + 1
Running benchmark(numpy_v1)
showed this approach to be more than twice as fast as the initial implementation (~2.3x):
numpy_v1 took 13.0169 seconds to determine the position
in a top 30 leaderboard for 10000 disciplines 10000 times
Numpy v2
I didn’t like the np.tile
-based approach and messed around with the np.repeat
function until I got that to work and that made this a bit faster.
def numpy_v2(leaderboard: np.ndarray, p_times: np.ndarray) -> np.ndarray:
p_times_for_each_spot = np.repeat(p_times, leaderboard.shape[1]).reshape(leaderboard.shape)
difference_between_spots = leaderboard - p_times_for_each_spot
difference_between_spots[difference_between_spots > 0] = 0
difference_between_spots[difference_between_spots < 0] = 1
return np.sum(difference_between_spots, axis=1) + 1
The benchmark(numpy_v2)
showed this approach to be slightly faster - 2.5x vs 2.3x compared to the original implementation.
numpy_v2 took 11.6425 seconds to determine the position
in a top 30 leaderboard for 10000 disciplines 10000 times
Numpy v3
Then I realized that we don’t really need to subtract the values, since numpy allows us to compare the values using np.less
directly.
This allowed me to get rid of the subtraction as well as the smart indexing to set 0/1, because comparing the boolean results to integer got me the 1s and 0s I needed.
def numpy_v3(leaderboard: np.ndarray, p_times: np.ndarray) -> np.ndarray:
p_times_for_each_spot = np.repeat(p_times, leaderboard.shape[1]).reshape(leaderboard.shape)
is_lower_than_p_time = np.less(leaderboard, p_times_for_each_spot)
return is_lower_than_p_time.astype(int).sum(axis=1) + 1
Doing less work made it faster and benchmark(numpy_v3)
reported this to be ~3.6 times more performant than the original python implementation.
numpy_v3 took 8.2042 seconds to determine the position
in a top 30 leaderboard for 10000 disciplines 10000 times
Numpy v4
At that time I was still using repeat to give the arrays the same shape until I figured out a way to reshape it so numpy was able to do the comparison without repeat:
def numpy_v4(leaderboard: np.ndarray, p_times: np.ndarray) -> np.ndarray:
is_lower_than_p_time = np.less(leaderboard, p_times.reshape(leaderboard.shape[0], 1))
return is_lower_than_p_time.astype(int).sum(axis=1) + 1
Now this implementation is about 5.1x faster than the original, which is nice.
numpy_v4 took 5.8386 seconds to determine the position
in a top 30 leaderboard for 10000 disciplines 10000 times
Numpy v5
Then I realized that numpy probably treats the boolean values as 1/0 and tried removing the .astype(int)
step.
def numpy_v5(leaderboard: np.ndarray, p_times: np.ndarray) -> np.ndarray:
is_lower_than_p_time = np.less(leaderboard, p_times.reshape(leaderboard.shape[0], 1))
return is_lower_than_p_time.sum(axis=1) + 1
Lo and behold, the benchmark shows that this is now about 8.3x faster than the original implementation.
numpy_v5 took 3.7019 seconds to determine the position
in a top 30 leaderboard for 10000 disciplines 10000 times
Conclusion
The Numpy implementation is significantly faster because it’s able to leverage the vectorized operations built into the library. Even though binary search should be a lot faster in individual disciplines, the fact that we can’t really vectorize the operation means it’s running in Python itself and not the optimized C-code that Numpy uses.
— Maurice