Add support for chunking AsyncIterator objects

This commit is contained in:
Josh
2020-12-04 09:32:53 +10:00
committed by GitHub
parent a39c957ba5
commit f8e428bd5b
2 changed files with 45 additions and 0 deletions

View File

@ -62,6 +62,11 @@ class _AsyncIterator:
if ret:
return elem
def chunk(self, max_size):
if max_size <= 0:
raise ValueError('async iterator chunk sizes must be greater than 0.')
return _ChunkedAsyncIterator(self, max_size)
def map(self, func):
return _MappedAsyncIterator(self, func)
@ -92,6 +97,26 @@ class _AsyncIterator:
def _identity(x):
return x
class _ChunkedAsyncIterator(_AsyncIterator):
def __init__(self, iterator, max_size):
self.iterator = iterator
self.max_size = max_size
async def next(self):
ret = []
n = 0
while n < self.max_size:
try:
item = await self.iterator.next()
except NoMoreItems:
if ret:
return ret
raise
else:
ret.append(item)
n += 1
return ret
class _MappedAsyncIterator(_AsyncIterator):
def __init__(self, iterator, func):
self.iterator = iterator