Optimise utils.find and specialise utils.as_chunks

This commit is contained in:
Sacul
2025-11-19 08:51:27 +08:00
committed by GitHub
parent c342db8534
commit 9be91cb093
2 changed files with 16 additions and 13 deletions

View File

@@ -821,7 +821,7 @@ class GuildChannel:
if obj.is_default(): if obj.is_default():
return base return base
overwrite = utils.get(self._overwrites, type=_Overwrites.ROLE, id=obj.id) overwrite = utils.find(lambda ow: ow.type == _Overwrites.ROLE and ow.id == obj.id, self._overwrites)
if overwrite is not None: if overwrite is not None:
base.handle_overwrite(overwrite.allow, overwrite.deny) base.handle_overwrite(overwrite.allow, overwrite.deny)

View File

@@ -56,6 +56,8 @@ from typing import (
TYPE_CHECKING, TYPE_CHECKING,
) )
import unicodedata import unicodedata
import collections.abc
from itertools import islice
from base64 import b64encode, b64decode from base64 import b64encode, b64decode
from bisect import bisect_left from bisect import bisect_left
import datetime import datetime
@@ -434,7 +436,7 @@ def time_snowflake(dt: datetime.datetime, /, *, high: bool = False) -> int:
def _find(predicate: Callable[[T], Any], iterable: Iterable[T], /) -> Optional[T]: def _find(predicate: Callable[[T], Any], iterable: Iterable[T], /) -> Optional[T]:
return next((element for element in iterable if predicate(element)), None) return next(filter(predicate, iterable), None)
async def _afind(predicate: Callable[[T], Any], iterable: AsyncIterable[T], /) -> Optional[T]: async def _afind(predicate: Callable[[T], Any], iterable: AsyncIterable[T], /) -> Optional[T]:
@@ -1037,17 +1039,18 @@ def escape_mentions(text: str) -> str:
def _chunk(iterator: Iterable[T], max_size: int) -> Iterator[List[T]]: def _chunk(iterator: Iterable[T], max_size: int) -> Iterator[List[T]]:
ret = [] # Specialise iterators that can be sliced as it is much faster
n = 0 if isinstance(iterator, collections.abc.Sequence):
for item in iterator: for i in range(0, len(iterator), max_size):
ret.append(item) yield list(iterator[i : i + max_size])
n += 1 else:
if n == max_size: # Fallback to slower path
yield ret iterator = iter(iterator)
ret = [] while True:
n = 0 batch = list(islice(iterator, max_size))
if ret: if not batch:
yield ret break
yield batch
async def _achunk(iterator: AsyncIterable[T], max_size: int) -> AsyncIterator[List[T]]: async def _achunk(iterator: AsyncIterable[T], max_size: int) -> AsyncIterator[List[T]]: