My question is simple, but I find it difficult to get the point straight, so please allow me to explain step by step.
Suppose I have N
items and N
corresponding indices.
Each item can be loaded using the corresponding index.
def load_item(index: int) -> ItemType:
# Mostly just reading, but very slow.
return item
Also I have a function that takes two (loaded) items and calculates a score.
def calc_score(item_a: ItemType, item_b: ItemType) -> ScoreType:
# Much faster than load function.
return score
Note that calc_score(a, b) == calc_score(b, a)
.
What I want to do is calculate the score for all 2-item combinations and find (at least) one combination that gives the maximum score.
This can be implemented as follows:
def dumb_solution(n: int) -> Tuple[int, int]:
best_score = 0
best_combination = None
for index_a, index_b in itertools.combinations(range(n), 2):
item_a = load_item(index_a)
item_b = load_item(index_b)
score = calc_score(item_a, item_b)
if score > best_score:
best_score = score
best_combination = (index_a, index_b)
return best_combination
However, this solution calls the load_item
function 2*C(N,2) = N*(N-1)
times, which is the bottleneck for this function.
This can be resolved by using a cache. Unfortunately, however, the items are so large that it is impossible to keep all items in memory. Therefore, we need to use a size-limited cache.
from functools import lru_cache
@lru_cache(maxsize=M)
def load(index: int) -> ItemType:
# Very slow process.
return item
Note that M
(cache size) is much smaller than N
(approx. N // 10
to N // 2
).
The problem is that the typical sequence of combinations is not ideal for the LRU cache.
For instance, when N=6, M=3
, itertools.combinations
generates the following sequence, and the number of calls of the load_item
function is 17.
[
(0, 1), # 1, 2
(0, 2), # -, 3
(0, 3), # -, 4
(0, 4), # -, 5
(0, 5), # -, 6
(1, 2), # 7, 8
(1, 3), # -, 9
(1, 4), # -, 10
(1, 5), # -, 11
(2, 3), # 12, 13
(2, 4), # -, 14
(2, 5), # -, 15
(3, 4), # 16, 17
(3, 5), # -, -
(4, 5), # -, -
]
However, if I rearrange the above sequence as follows, the number of calls will be 10.
[
(0, 1), # 1, 2
(0, 2), # -, 3
(1, 2), # -, -
(0, 3), # -, 4
(2, 3), # -, -
(0, 4), # -, 5
(3, 4), # -, -
(0, 5), # -, 6
(4, 5), # -, -
(1, 4), # 7, -
(1, 5), # -, -
(1, 3), # -, 8
(3, 5), # -, -
(2, 5), # 9, -
(2, 4), # -, 10
]
Question:
How can I generate a sequence of 2-item combinations that maximizes the cache hit rate?
What I tried:
The solution I came up with is to prioritize items that are already in the cache.
from collections import OrderedDict
def prioritizes_item_already_in_cache(n, cache_size):
items = list(itertools.combinations(range(n), 2))
cache = OrderedDict()
reordered = []
def update_cache(x, y):
cache[x] = cache[y] = None
cache.move_to_end(x)
cache.move_to_end(y)
while len(cache) > cache_size:
cache.popitem(last=False)
while items:
# Find a pair where both are cached.
for i, (a, b) in enumerate(items):
if a in cache and b in cache:
reordered.append((a, b))
update_cache(a, b)
del items[i]
break
else:
# Find a pair where one of them is cached.
for i, (a, b) in enumerate(items):
if a in cache or b in cache:
reordered.append((a, b))
update_cache(a, b)
del items[i]
break
else:
# Cannot find item in cache.
a, b = items.pop(0)
reordered.append((a, b))
update_cache(a, b)
return reordered
For N=100, M=10
, this sequence resulted in 1660 calls, which is about 1/3 of the typical sequence. For N=100, M=50
there are only 155 calls. So I think I can say that this is a promising approach.
Unfortunately, this function is too slow and useless for large N
.
I was not able to finish for N=1000
, but the actual data is in the tens of thousands.
Also, it does not take into account how to select an item when no cached item is found.
Therefore, even if it is fast, it is doubtful that it is theoretically the best solution (so please note my question is not how to make the above function faster).
(Edited) Here is the complete code including everyone's answers and the test and benchmark code.
import functools
import itertools
import math
import time
from collections import Counter, OrderedDict
from itertools import chain, combinations, product
from pathlib import Path
from typing import Callable, Iterable, Tuple
import joblib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from PIL import Image, ImageDraw
ItemType = int
ScoreType = int
def load_item(index: int) -> ItemType:
return int(index)
def calc_score(item_a: ItemType, item_b: ItemType) -> ScoreType:
return abs(item_a - item_b)
class LRUCacheWithCounter:
def __init__(self, maxsize: int):
def wrapped_func(key):
self.load_count += 1
return load_item(key)
self.__cache = functools.lru_cache(maxsize=maxsize)(wrapped_func)
self.load_count = 0
def __call__(self, key: int) -> int:
return self.__cache(key)
def basic_loop(iterator: Iterable[Tuple[int, int]], cached_load: Callable[[int], int]):
best_score = 0
best_combination = None
for i, j in iterator:
a = cached_load(i)
b = cached_load(j)
score = calc_score(a, b)
if score > best_score:
best_score = score
best_combination = (i, j)
return best_score, best_combination
def baseline(n, _):
return itertools.combinations(range(n), 2)
def prioritizes(n, cache_size):
items = list(itertools.combinations(range(n), 2))
cache = OrderedDict()
reordered = []
def update_cache(x, y):
cache[x] = cache[y] = None
cache.move_to_end(x)
cache.move_to_end(y)
while len(cache) > cache_size:
cache.popitem(last=False)
while items:
# Find a pair where both are cached.
for i, (a, b) in enumerate(items):
if a in cache and b in cache:
reordered.append((a, b))
update_cache(a, b)
del items[i]
break
else:
# Find a pair where one of them is cached.
for i, (a, b) in enumerate(items):
if a in cache or b in cache:
reordered.append((a, b))
update_cache(a, b)
del items[i]
break
else:
# Cannot find item in cache.
a, b = items.pop(0)
reordered.append((a, b))
update_cache(a, b)
return reordered
def Matt_solution(n: int, cache_size: int) -> Iterable[Tuple[int, int]]:
dest = []
def findPairs(lo1: int, n1: int, lo2: int, n2: int):
if n1 < 1 or n2 < 1:
return
if n1 == 1:
for i in range(max(lo1 + 1, lo2), lo2 + n2):
dest.append((lo1, i))
elif n2 == 1:
for i in range(lo1, min(lo1 + n1, lo2)):
dest.append((i, lo2))
elif n1 >= n2:
half = n1 // 2
findPairs(lo1, half, lo2, n2)
findPairs(lo1 + half, n1 - half, lo2, n2)
else:
half = n2 // 2
findPairs(lo1, n1, lo2, half)
findPairs(lo1, n1, lo2 + half, n2 - half)
findPairs(0, n, 0, n)
return dest
def Kelly_solution(n: int, cache_size: int) -> Iterable[Tuple[int, int]]:
k = cache_size // 2
r = range(n)
return chain.from_iterable(combinations(r[i : i + k], 2) if i == j else product(r[i : i + k], r[j : j + k]) for i in r[::k] for j in r[i::k])
def Kelly_solution2(n: int, cache_size: int) -> Iterable[Tuple[int, int]]:
k = cache_size - 2
r = range(n)
return chain.from_iterable(combinations(r[i : i + k], 2) if i == j else product(r[i : i + k], r[j : j + k]) for i in r[::k] for j in r[i::k])
def diagonal_block(lower, upper):
for i in range(lower, upper + 1):
for j in range(i + 1, upper + 1):
yield i, j
def strip(i_lower, i_upper, j_lower, j_upper):
for i in range(i_lower, i_upper + 1):
for j in range(j_lower, j_upper + 1):
yield i, j
def btilly_solution(n: int, cache_size: int):
i_lower = 0
i_upper = n - 1
k = cache_size - 2
is_asc = True
while i_lower <= i_upper:
# Handle a k*k block first. At the end that is likely loaded.
if is_asc:
upper = min(i_lower + k - 1, i_upper)
yield from diagonal_block(i_lower, upper)
j_lower = i_lower
j_upper = upper
i_lower = upper + 1
else:
lower = max(i_lower, i_upper - k + 1)
yield from diagonal_block(lower, i_upper)
j_lower = lower
j_upper = i_upper
i_upper = lower - 1
yield from strip(i_lower, i_upper, j_lower, j_upper)
is_asc = not is_asc
def btilly_solution2(n: int, cache_size: int):
k = cache_size - 2
for top in range(0, n, k):
bottom = top + k
# Diagonal part.
for y in range(top, min(bottom, n)): # Y-axis Top to Bottom
for x in range(y + 1, min(bottom, n)): # X-axis Left to Right
yield y, x
# Strip part.
# Stripping right to left works well when cache_size is very small, but makes little difference when it is not.
for x in range(n - 1, bottom - 1, -1): # X-axis Right to Left
for y in range(top, min(bottom, n)): # Y-axis Top to Bottom
yield y, x
def btilly_solution3(n: int, cache_size: int):
k = cache_size - 2
r = range(n)
for i in r[::k]:
yield from combinations(r[i : i + k], 2)
yield from product(r[i + k :], r[i : i + k])
def btilly_solution4(n: int, cache_size: int):
def parts():
k = cache_size - 2
r = range(n)
for i in r[::k]:
yield combinations(r[i : i + k], 2)
yield product(r[i + k :], r[i : i + k])
return chain.from_iterable(parts())
def plot(df, series, ignore, y, label, title):
df = df[df["name"].isin(series)]
# plt.figure(figsize=(10, 10))
for name, group in df.groupby("name"):
plt.plot(group["n"], group[y], label=name)
y_max = df[~df["name"].isin(ignore)][y].max()
plt.ylim(0, y_max * 1.1)
plt.xlabel("n")
plt.ylabel(label)
plt.title(title)
plt.legend(loc="upper left")
plt.tight_layout()
plt.grid()
plt.show()
def run(func, n, cache_ratio, output_dir: Path):
cache_size = int(n * cache_ratio / 100)
output_path = output_dir / f"{n}_{cache_ratio}_{func.__name__}.csv"
if output_path.exists():
return
started = time.perf_counter()
for a, b in func(n, cache_size):
pass
elapsed_iterate = time.perf_counter() - started
# test_combinations(func(n, cache_size), n)
started = time.perf_counter()
cache = LRUCacheWithCounter(cache_size)
basic_loop(iterator=func(n, cache_size), cached_load=cache)
elapsed_cache = time.perf_counter() - started
output_path.write_text(f"{func.__name__},{n},{cache_ratio},{cache_size},{cache.load_count},{elapsed_iterate},{elapsed_cache}")
def add_lower_bound(df):
def calc_lower_bound(ni, mi):
n = ni
m = n * mi // 100
return m + math.ceil((math.comb(n, 2) - math.comb(m, 2)) / (m - 1))
return pd.concat(
[
df,
pd.DataFrame(
[
{"name": "lower_bound", "n": ni, "m": mi, "count": calc_lower_bound(ni, mi)}
for ni, mi in itertools.product(df["n"].unique(), df["m"].unique())
]
),
]
)
def benchmark(output_dir: Path):
log_dir = output_dir / "log"
log_dir.mkdir(parents=True, exist_ok=True)
candidates = [
baseline,
prioritizes,
Matt_solution,
Kelly_solution,
Kelly_solution2,
btilly_solution,
btilly_solution2,
btilly_solution3,
btilly_solution4,
]
nc = np.linspace(100, 500, num=9).astype(int)
# nc = np.linspace(500, 10000, num=9).astype(int)[1:]
# nc = np.linspace(10000, 100000, num=9).astype(int).tolist()[1:]
print(nc)
mc = np.linspace(10, 50, num=2).astype(int)
print(mc)
joblib.Parallel(n_jobs=1, verbose=5, batch_size=1)([joblib.delayed(run)(func, ni, mi, log_dir) for ni in nc for mi in mc for func in candidates])
def plot_graphs(output_dir: Path):
log_dir = output_dir / "log"
results = []
for path in log_dir.glob("*.csv"):
results.append(path.read_text().strip())
(output_dir / "stat.csv").write_text("\n".join(results))
df = pd.read_csv(output_dir / "stat.csv", header=None, names=["name", "n", "m", "size", "count", "time", "time_full"])
df = add_lower_bound(df)
df = df.sort_values(["name", "n", "m"])
for m in [10, 50]:
plot(
df[df["m"] == m],
series=[
baseline.__name__,
prioritizes.__name__,
Matt_solution.__name__,
Kelly_solution.__name__,
Kelly_solution2.__name__,
btilly_solution.__name__,
"lower_bound",
],
ignore=[
baseline.__name__,
prioritizes.__name__,
],
y="count",
label="load count",
title=f"cache_size = {m}% of N",
)
plot(
df[df["m"] == 10],
series=[
baseline.__name__,
prioritizes.__name__,
Matt_solution.__name__,
Kelly_solution.__name__,
Kelly_solution2.__name__,
btilly_solution.__name__,
btilly_solution2.__name__,
btilly_solution3.__name__,
btilly_solution4.__name__,
],
ignore=[
prioritizes.__name__,
Matt_solution.__name__,
],
y="time",
label="time (sec)",
title=f"cache_size = {10}% of N",
)
class LRUCacheForTest:
def __init__(self, maxsize: int):
self.cache = OrderedDict()
self.maxsize = maxsize
self.load_count = 0
def __call__(self, key: int) -> int:
if key in self.cache:
value = self.cache[key]
self.cache.move_to_end(key)
else:
if len(self.cache) == self.maxsize:
self.cache.popitem(last=False)
value = load_item(key)
self.cache[key] = value
self.load_count += 1
return value
def hit(self, i, j):
count = int(i in self.cache)
self(i)
count += int(j in self.cache)
self(j)
return count
def visualize():
# Taken from https://mcmap.net/q/697569/-how-to-maximize-the-cache-hit-rate-of-the-2-element-combinations and modified.
n, m = 100, 30
func = btilly_solution2
pairs = func(n, m)
cache = LRUCacheForTest(m)
# Create the images, save as animated png.
images = []
s = 5
img = Image.new("RGB", (s * n, s * n), (255, 255, 255))
draw = ImageDraw.Draw(img)
colors = [(255, 0, 0), (255, 255, 0), (0, 255, 0)]
for step, (i, j) in enumerate(pairs):
draw.rectangle((s * j, s * i, s * j + s - 2, s * i + s - 2), colors[cache.hit(i, j)])
if not step % 17:
images.append(img.copy())
images += [img] * 40
images[0].save(f"{func.__name__}_{m}.gif", save_all=True, append_images=images[1:], optimize=False, duration=30, loop=0)
def test_combinations(iterator: Iterable[Tuple[int, int]], n: int):
# Note that this function is not suitable for large N.
expected = set(frozenset(pair) for pair in itertools.combinations(range(n), 2))
items = list(iterator)
actual = set(frozenset(pair) for pair in items)
assert len(actual) == len(items), f"{[item for item, count in Counter(items).items() if count > 1]}"
assert actual == expected, f"dup={actual - expected}, missing={expected - actual}"
def test():
n = 100 # N
cache_size = 30 # M
def run(func):
func(n, cache_size)
# Measure generation performance.
started = time.perf_counter()
for a, b in func(n, cache_size):
pass
elapsed = time.perf_counter() - started
# Test generated combinations.
test_combinations(func(n, cache_size), n)
# Measure cache hit (load count) performance.
cache = LRUCacheWithCounter(cache_size)
_ = basic_loop(iterator=func(n, cache_size), cached_load=cache)
print(f"{func.__name__}: {cache.load_count=}, {elapsed=}")
candidates = [
baseline,
prioritizes,
Matt_solution,
Kelly_solution,
Kelly_solution2,
btilly_solution,
btilly_solution2,
btilly_solution3,
btilly_solution4,
]
for f in candidates:
run(f)
def main():
test()
visualize()
output_dir = Path("./temp2")
benchmark(output_dir)
plot_graphs(output_dir)
if __name__ == "__main__":
main()
I have no problem with you not using the above test code or changing the behavior of basic_loop
or LRUCacheWithCounter
.
Additional Note:
- The score calculation cannot be pruned using neighbor scores.
- The score calculation cannot be pruned using only a portion of the item.
- It is impossible to guess where the best combination will be.
- Using faster media is one option, but I'm already at my limit, so I'm looking for a software solution.
Thank you for reading this long post to the end.
Edit:
Thanks to btilly's answer and help with Kelly's visualization, I have come to the conclusion that btilly's solution is the best and (possibly) optimal one.
Here is a theoretical explanation (although I am not very good at math, so it could be wrong).
Let N
represent the number of indexes, M
the cache size, and C
the number of combinations (same as math.comb
).
Consider a situation where the cache is full and no further combinations can be generated without loading.
If we add a new index at this point, the only combinations that can be generated are combinations of the newly added index and the remaining indexes in the cache.
This pattern holds for each subsequent iteration.
Hence, while the cache is full, the maximum number of combinations can be generated per load is M - 1
.
This logic holds if the cache isn't full as well.
If M'
indexes are currently in the cache, then the next index can generate at most M'
combinations.
The subsequent index can generate at most M' + 1
combinations, and so forth.
In total, at most C(M,2)
combinations can be generated before the cache is full.
Thus, to generate C(N,2)
combinations, at least M
loads are required to fill the cache, at least (C(N,2) - C(M,2)) / (M - 1)
loads are required after the cache is filled.
From above, the load counts complexity of this problem is Ω(N^2 / M)
.
I have plotted this formula as a lower_bound
in the graphs below.
Note that it is only a lower bound and no guarantee that it can actually be achieved.
As an aside, Kelly's solution needs to configure k
to maximize its performance.
For M = 50% of N
, it's about M * 2/3
.
For M = 30% of N
, it's about M * 5/6
.
Although I couldn't figure out how to calculate it.
As a general configuration, I use k = M - 2
(which is not best, but relatively good) in the Kelly_solution2
in the graphs below.
For M = 10% of N
:
For M = 50% of N
:
Note that, in these graphs, it looks like O(N)
, but this is because I determined M
based on N
. When M
does not change, it is O(N^2)
as described above.
Here is an animation visualizing the cache hit rate of btilly_solution2
, composed by a modified version of Kelly's code.
Each pixel represents a combination, with red representing combinations where both indexes are loaded, yellow where one index is loaded, and green where neither index is loaded.
In addition, since I'm looking for the optimal sequence, execution time doesn't matter much. But just in case anyone is curious, here is a comparison of execution times (iteration only).
btilly_solution4
(btilly's solution modified by Kelly) is almost as fast as itertools.combinations
, which should be optimal in this case.
Note, however, that even without the modification, it took only 112 nanoseconds per combination.
That's it. Thanks to everyone involved.
calc_score
function which is relatively fast. I'm guessing thiscalc_score
function only needs some features of the items, and not the items as a whole? In this case, load the items, but instead of keeping them all in memory, only keep the relevant information which is needed bycalc_score
. – Nevilcalc_score
is fast enough. – Nevilcalc_score
? You said thatcalc_score(a, b) == calc_score(b, a)
, which is very good. Perhapscalc_score
has other cool properties. Cancalc_score
represent something akin to geographic proximity, so that if (a, b) has a good score and (b, c) has a bad score, then (a, c) has a bad score? Or geographic distance, so that if (a, b) has a bad score and (b, c) has a bad score, then (a, c) has a bad score? Maybe the triangular inequalitycalc_score(a, b) + calc_score(b, c) >= calc_score(a, c)
orcalc_score(a, c) >= abs(calc_score(a, b) - calc_score(b, c))
? – NevilN(N+1)/2*2 = N(N+1)
.calc_score
is user-defined (by me, to be exact), and it should be possible to change it in the future. So, I like to avoid adding constraints whenever possible. Currently, only commutativity can be guaranteed.calc_score
is fast because the machine has powerful CPUs (and weak IO). – Manumissionfor index_a, index_b in ...:
, i.e., you don't keep a reference to each pair. But your benchmark'sfor _ in func(n, cache_size):
does, and that prevents optimizations, making some solutions look slower. You could dodeque(func(n, cache_size), 0)
instead of your loop, that would allow those optimizations again and also would be less overhead than a Python-level loop. Btw I'm curious how you changed our solutions to generators. That probably made them slower. Maybe your edit would better be an answer, with tested codes included. – Ketchanfor a, b in func(...)
should be the closest to my actual code, I will re-run the benchmark with it. However, I believe the difference in execution speed is mainly caused by something else, so I think it will remain the same in the big picture. – Manumissionfor a, b in func(n, cache_size): pass
took me 15 seconds whiledeque(func(n, cache_size), 0)
only took 2.4 seconds. So that's making it look over six times slower than it actually is. (Granted, its speed and the speeds of most of the solutions are likely insignificant compared to the actual work you do with the pairs. But I still think inflating the times with consumer-time is misleading.) – Ketchanbaseline
andKelly_solution
withdeque(func(n, cache_size), 0)
, at N=100k, on my PC (not the same one I used above), I got 21 and 38 seconds, 17 seconds difference. So (ignoring the difference in cache hit rates) changingbaseline
toKelly_solution
will impact my program by 17 seconds? No. The comparison using for-loop was 66 and 74 seconds, 8 seconds difference. Since I use for-loop, 8 seconds difference would be closer to the actual impact on my program. – Manumission