Skip to content

API Reference

BatchProcessor

batch_processors.batchers.BatchProcessor

Bases: Generic[T, R]

A class for processing items in batches.

This class takes an iterable of items, processes them in batches using a provided function, and optionally saves progress to allow for checkpoint recovery.

Attributes:

Name Type Description
process_func Callable[[T], R]

The function used to process each item.

batch_size int

The number of items to process in each batch.

pickle_file Optional[str]

The file to use for saving/loading progress.

processed_items List[T]

A list of items that have been processed.

results List[R]

A list of results from processing the items.

recover_from_checkpoint bool

Whether to attempt to recover from a checkpoint.

use_tqdm bool

Whether to use tqdm progress bars.

Source code in src/batch_processors/batchers.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
class BatchProcessor(Generic[T, R]):
    """
    A class for processing items in batches.

    This class takes an iterable of items, processes them in batches using a provided
    function, and optionally saves progress to allow for checkpoint recovery.

    Attributes:
        process_func (Callable[[T], R]): The function used to process each item.
        batch_size (int): The number of items to process in each batch.
        pickle_file (Optional[str]): The file to use for saving/loading progress.
        processed_items (List[T]): A list of items that have been processed.
        results (List[R]): A list of results from processing the items.
        recover_from_checkpoint (bool): Whether to attempt to recover from a checkpoint.
        use_tqdm (bool): Whether to use tqdm progress bars.
    """

    def __init__(
        self,
        process_func: Callable[[T], R],
        batch_size: int = 100,
        pickle_file: Optional[str] = None,
        logfile: Optional[str] = None,
        recover_from_checkpoint: bool = False,
        use_tqdm: bool = False,
    ):
        """
        Initialize the BatchProcessor.

        Args:
            process_func (Callable[[T], R]): The function to process each item.
            batch_size (int, optional): The number of items to process in each batch. Defaults to 100.
            pickle_file (Optional[str], optional): The file to use for saving/loading progress. Defaults to None.
            logfile (Optional[str], optional): The file to use for logging. Defaults to None.
            recover_from_checkpoint (bool, optional): Whether to attempt to recover from a checkpoint. Defaults to False.
            use_tqdm (bool, optional): Whether to use tqdm progress bars. Defaults to False.
        """
        self.process_func = process_func
        self.batch_size = batch_size
        self.pickle_file = pickle_file
        self.processed_items: List[T] = []
        self.results: List[R] = []
        self.recover_from_checkpoint = recover_from_checkpoint
        self.use_tqdm = use_tqdm

        # Set up logging
        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(logging.INFO)
        self.logger.handlers = []  # Clear any existing handlers
        formatter = logging.Formatter("%(asctime)s - %(message)s")

        # File handler (if logfile is provided)
        if logfile:
            file_handler = logging.FileHandler(logfile)
            file_handler.setFormatter(formatter)
            self.logger.addHandler(file_handler)

        # Recover from checkpoint if enabled
        if self.recover_from_checkpoint:
            self.load_progress()

    def process_item(self, job_number: int, item: T) -> R:
        """
        Process a single item.

        Args:
            job_number (int): The number of the job being processed.
            item (T): The item to process.

        Returns:
            R: The result of processing the item.
        """
        result = self.process_func(item)
        self.logger.info(f"Processed job {job_number}: {item}")
        return result

    def process_batch(self, batch: List[T], batch_number: int, total_jobs: int):
        """
        Process a batch of items.

        Args:
            batch (List[T]): The batch of items to process.
            batch_number (int): The number of the current batch.
            total_jobs (int): The total number of jobs to process.
        """
        if self.use_tqdm:
            batch_results = [
                self.process_item(i, item)
                for i, item in enumerate(tqdm(batch, desc=f"Batch {batch_number}"))
            ]
        else:
            batch_results = [self.process_item(i, item) for i, item in enumerate(batch)]

        self.processed_items.extend(batch)
        self.results.extend(batch_results)

        if self.pickle_file:
            self.save_progress()

        completion_message = f"Batch {batch_number} completed. Total processed: {len(self.processed_items)}/{total_jobs}"
        print(completion_message)
        self.logger.info(completion_message)

    def load_progress(self):
        """
        Load progress from a checkpoint file if it exists.
        """
        if self.pickle_file and os.path.exists(self.pickle_file):
            with open(self.pickle_file, "rb") as f:
                data = pickle.load(f)
                self.processed_items = data["processed_items"]
                self.results = data["results"]
            self.logger.info(
                f"Recovered {len(self.processed_items)} items from checkpoint"
            )
        else:
            self.logger.info(
                "No checkpoint file found or checkpoint recovery not enabled"
            )

    def save_progress(self):
        """
        Save current progress to a checkpoint file.
        """
        with open(self.pickle_file, "wb") as f:
            pickle.dump(
                {"processed_items": self.processed_items, "results": self.results},
                f,
            )

    def process_items_in_batches(
        self, input_items: Iterable[T]
    ) -> Tuple[List[T], List[R]]:
        """
        Process all input items in batches.

        Args:
            input_items (Iterable[T]): The items to process.

        Returns:
            Tuple[List[T], List[R]]: A tuple containing the list of processed items and their results.
        """
        input_items = list(input_items)  # Convert iterable to list
        total_jobs = len(input_items)
        start_index = len(self.processed_items) if self.recover_from_checkpoint else 0

        for i in range(start_index, total_jobs, self.batch_size):
            batch = input_items[i : i + self.batch_size]
            self.process_batch(batch, i // self.batch_size + 1, total_jobs)

        return self.processed_items, self.results

__init__(process_func, batch_size=100, pickle_file=None, logfile=None, recover_from_checkpoint=False, use_tqdm=False)

Initialize the BatchProcessor.

Parameters:

Name Type Description Default
process_func Callable[[T], R]

The function to process each item.

required
batch_size int

The number of items to process in each batch. Defaults to 100.

100
pickle_file Optional[str]

The file to use for saving/loading progress. Defaults to None.

None
logfile Optional[str]

The file to use for logging. Defaults to None.

None
recover_from_checkpoint bool

Whether to attempt to recover from a checkpoint. Defaults to False.

False
use_tqdm bool

Whether to use tqdm progress bars. Defaults to False.

False
Source code in src/batch_processors/batchers.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def __init__(
    self,
    process_func: Callable[[T], R],
    batch_size: int = 100,
    pickle_file: Optional[str] = None,
    logfile: Optional[str] = None,
    recover_from_checkpoint: bool = False,
    use_tqdm: bool = False,
):
    """
    Initialize the BatchProcessor.

    Args:
        process_func (Callable[[T], R]): The function to process each item.
        batch_size (int, optional): The number of items to process in each batch. Defaults to 100.
        pickle_file (Optional[str], optional): The file to use for saving/loading progress. Defaults to None.
        logfile (Optional[str], optional): The file to use for logging. Defaults to None.
        recover_from_checkpoint (bool, optional): Whether to attempt to recover from a checkpoint. Defaults to False.
        use_tqdm (bool, optional): Whether to use tqdm progress bars. Defaults to False.
    """
    self.process_func = process_func
    self.batch_size = batch_size
    self.pickle_file = pickle_file
    self.processed_items: List[T] = []
    self.results: List[R] = []
    self.recover_from_checkpoint = recover_from_checkpoint
    self.use_tqdm = use_tqdm

    # Set up logging
    self.logger = logging.getLogger(__name__)
    self.logger.setLevel(logging.INFO)
    self.logger.handlers = []  # Clear any existing handlers
    formatter = logging.Formatter("%(asctime)s - %(message)s")

    # File handler (if logfile is provided)
    if logfile:
        file_handler = logging.FileHandler(logfile)
        file_handler.setFormatter(formatter)
        self.logger.addHandler(file_handler)

    # Recover from checkpoint if enabled
    if self.recover_from_checkpoint:
        self.load_progress()

load_progress()

Load progress from a checkpoint file if it exists.

Source code in src/batch_processors/batchers.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def load_progress(self):
    """
    Load progress from a checkpoint file if it exists.
    """
    if self.pickle_file and os.path.exists(self.pickle_file):
        with open(self.pickle_file, "rb") as f:
            data = pickle.load(f)
            self.processed_items = data["processed_items"]
            self.results = data["results"]
        self.logger.info(
            f"Recovered {len(self.processed_items)} items from checkpoint"
        )
    else:
        self.logger.info(
            "No checkpoint file found or checkpoint recovery not enabled"
        )

process_batch(batch, batch_number, total_jobs)

Process a batch of items.

Parameters:

Name Type Description Default
batch List[T]

The batch of items to process.

required
batch_number int

The number of the current batch.

required
total_jobs int

The total number of jobs to process.

required
Source code in src/batch_processors/batchers.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def process_batch(self, batch: List[T], batch_number: int, total_jobs: int):
    """
    Process a batch of items.

    Args:
        batch (List[T]): The batch of items to process.
        batch_number (int): The number of the current batch.
        total_jobs (int): The total number of jobs to process.
    """
    if self.use_tqdm:
        batch_results = [
            self.process_item(i, item)
            for i, item in enumerate(tqdm(batch, desc=f"Batch {batch_number}"))
        ]
    else:
        batch_results = [self.process_item(i, item) for i, item in enumerate(batch)]

    self.processed_items.extend(batch)
    self.results.extend(batch_results)

    if self.pickle_file:
        self.save_progress()

    completion_message = f"Batch {batch_number} completed. Total processed: {len(self.processed_items)}/{total_jobs}"
    print(completion_message)
    self.logger.info(completion_message)

process_item(job_number, item)

Process a single item.

Parameters:

Name Type Description Default
job_number int

The number of the job being processed.

required
item T

The item to process.

required

Returns:

Name Type Description
R R

The result of processing the item.

Source code in src/batch_processors/batchers.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def process_item(self, job_number: int, item: T) -> R:
    """
    Process a single item.

    Args:
        job_number (int): The number of the job being processed.
        item (T): The item to process.

    Returns:
        R: The result of processing the item.
    """
    result = self.process_func(item)
    self.logger.info(f"Processed job {job_number}: {item}")
    return result

process_items_in_batches(input_items)

Process all input items in batches.

Parameters:

Name Type Description Default
input_items Iterable[T]

The items to process.

required

Returns:

Type Description
Tuple[List[T], List[R]]

Tuple[List[T], List[R]]: A tuple containing the list of processed items and their results.

Source code in src/batch_processors/batchers.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def process_items_in_batches(
    self, input_items: Iterable[T]
) -> Tuple[List[T], List[R]]:
    """
    Process all input items in batches.

    Args:
        input_items (Iterable[T]): The items to process.

    Returns:
        Tuple[List[T], List[R]]: A tuple containing the list of processed items and their results.
    """
    input_items = list(input_items)  # Convert iterable to list
    total_jobs = len(input_items)
    start_index = len(self.processed_items) if self.recover_from_checkpoint else 0

    for i in range(start_index, total_jobs, self.batch_size):
        batch = input_items[i : i + self.batch_size]
        self.process_batch(batch, i // self.batch_size + 1, total_jobs)

    return self.processed_items, self.results

save_progress()

Save current progress to a checkpoint file.

Source code in src/batch_processors/batchers.py
134
135
136
137
138
139
140
141
142
def save_progress(self):
    """
    Save current progress to a checkpoint file.
    """
    with open(self.pickle_file, "wb") as f:
        pickle.dump(
            {"processed_items": self.processed_items, "results": self.results},
            f,
        )

AsyncBatchProcessor

batch_processors.batchers.AsyncBatchProcessor

Bases: BatchProcessor[T, R]

An asynchronous version of the BatchProcessor.

This class processes items in batches asynchronously, with optional concurrency limits.

Source code in src/batch_processors/batchers.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
class AsyncBatchProcessor(BatchProcessor[T, R]):
    """
    An asynchronous version of the BatchProcessor.

    This class processes items in batches asynchronously, with optional concurrency limits.
    """

    def __init__(
        self,
        process_func: Callable[[T], R],
        batch_size: int = 100,
        pickle_file: Optional[str] = None,
        logfile: Optional[str] = None,
        recover_from_checkpoint: bool = False,
        max_concurrent: Optional[int] = None,
        use_tqdm: bool = False,
    ):
        """
        Initialize the AsyncBatchProcessor.

        Args:
            process_func (Callable[[T], R]): The function to process each item.
            batch_size (int, optional): The number of items to process in each batch. Defaults to 100.
            pickle_file (Optional[str], optional): The file to use for saving/loading progress. Defaults to None.
            logfile (Optional[str], optional): The file to use for logging. Defaults to None.
            recover_from_checkpoint (bool, optional): Whether to attempt to recover from a checkpoint. Defaults to False.
            max_concurrent (Optional[int], optional): The maximum number of concurrent operations. Defaults to None.
            use_tqdm (bool, optional): Whether to use tqdm progress bars. Defaults to False.
        """
        super().__init__(
            process_func,
            batch_size,
            pickle_file,
            logfile,
            recover_from_checkpoint,
            use_tqdm,
        )
        self.semaphore = (
            asyncio.Semaphore(max_concurrent) if max_concurrent is not None else None
        )

    async def process_item(self, job_number: int, item: T) -> R:
        """
        Process a single item asynchronously.

        Args:
            job_number (int): The number of the job being processed.
            item (T): The item to process.

        Returns:
            R: The result of processing the item.
        """

        async def _process():
            result = await self.process_func(item)
            self.logger.info(f"Processed job {job_number}: {item}")
            return result

        if self.semaphore:
            async with self.semaphore:
                return await _process()
        else:
            return await _process()

    async def process_batch(self, batch: List[T], batch_number: int, total_jobs: int):
        """
        Process a batch of items asynchronously.

        Args:
            batch (List[T]): The batch of items to process.
            batch_number (int): The number of the current batch.
            total_jobs (int): The total number of jobs to process.
        """
        if self.use_tqdm:
            pbar = tqdm(total=len(batch), desc=f"Batch {batch_number}")

        tasks = [
            self.process_item(i + (batch_number - 1) * self.batch_size, item)
            for i, item in enumerate(batch)
        ]

        batch_results = []
        for task in asyncio.as_completed(tasks):
            result = await task
            batch_results.append(result)
            if self.use_tqdm:
                pbar.update(1)

        if self.use_tqdm:
            pbar.close()

        self.processed_items.extend(batch)
        self.results.extend(batch_results)

        if self.pickle_file:
            self.save_progress()

        self.logger.info(
            f"Batch {batch_number} completed. Total processed: {len(self.processed_items)}/{total_jobs}"
        )

    async def process_items_in_batches(
        self, input_items: Iterable[T]
    ) -> Tuple[List[T], List[R]]:
        """
        Process all input items in batches asynchronously.

        Args:
            input_items (Iterable[T]): The items to process.

        Returns:
            Tuple[List[T], List[R]]: A tuple containing the list of processed items and their results.
        """
        input_items = list(input_items)  # Convert iterable to list
        total_jobs = len(input_items)
        start_index = len(self.processed_items) if self.recover_from_checkpoint else 0

        for i in range(start_index, total_jobs, self.batch_size):
            batch = input_items[i : i + self.batch_size]
            await self.process_batch(batch, i // self.batch_size + 1, total_jobs)

        return self.processed_items, self.results

__init__(process_func, batch_size=100, pickle_file=None, logfile=None, recover_from_checkpoint=False, max_concurrent=None, use_tqdm=False)

Initialize the AsyncBatchProcessor.

Parameters:

Name Type Description Default
process_func Callable[[T], R]

The function to process each item.

required
batch_size int

The number of items to process in each batch. Defaults to 100.

100
pickle_file Optional[str]

The file to use for saving/loading progress. Defaults to None.

None
logfile Optional[str]

The file to use for logging. Defaults to None.

None
recover_from_checkpoint bool

Whether to attempt to recover from a checkpoint. Defaults to False.

False
max_concurrent Optional[int]

The maximum number of concurrent operations. Defaults to None.

None
use_tqdm bool

Whether to use tqdm progress bars. Defaults to False.

False
Source code in src/batch_processors/batchers.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def __init__(
    self,
    process_func: Callable[[T], R],
    batch_size: int = 100,
    pickle_file: Optional[str] = None,
    logfile: Optional[str] = None,
    recover_from_checkpoint: bool = False,
    max_concurrent: Optional[int] = None,
    use_tqdm: bool = False,
):
    """
    Initialize the AsyncBatchProcessor.

    Args:
        process_func (Callable[[T], R]): The function to process each item.
        batch_size (int, optional): The number of items to process in each batch. Defaults to 100.
        pickle_file (Optional[str], optional): The file to use for saving/loading progress. Defaults to None.
        logfile (Optional[str], optional): The file to use for logging. Defaults to None.
        recover_from_checkpoint (bool, optional): Whether to attempt to recover from a checkpoint. Defaults to False.
        max_concurrent (Optional[int], optional): The maximum number of concurrent operations. Defaults to None.
        use_tqdm (bool, optional): Whether to use tqdm progress bars. Defaults to False.
    """
    super().__init__(
        process_func,
        batch_size,
        pickle_file,
        logfile,
        recover_from_checkpoint,
        use_tqdm,
    )
    self.semaphore = (
        asyncio.Semaphore(max_concurrent) if max_concurrent is not None else None
    )

process_batch(batch, batch_number, total_jobs) async

Process a batch of items asynchronously.

Parameters:

Name Type Description Default
batch List[T]

The batch of items to process.

required
batch_number int

The number of the current batch.

required
total_jobs int

The total number of jobs to process.

required
Source code in src/batch_processors/batchers.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
async def process_batch(self, batch: List[T], batch_number: int, total_jobs: int):
    """
    Process a batch of items asynchronously.

    Args:
        batch (List[T]): The batch of items to process.
        batch_number (int): The number of the current batch.
        total_jobs (int): The total number of jobs to process.
    """
    if self.use_tqdm:
        pbar = tqdm(total=len(batch), desc=f"Batch {batch_number}")

    tasks = [
        self.process_item(i + (batch_number - 1) * self.batch_size, item)
        for i, item in enumerate(batch)
    ]

    batch_results = []
    for task in asyncio.as_completed(tasks):
        result = await task
        batch_results.append(result)
        if self.use_tqdm:
            pbar.update(1)

    if self.use_tqdm:
        pbar.close()

    self.processed_items.extend(batch)
    self.results.extend(batch_results)

    if self.pickle_file:
        self.save_progress()

    self.logger.info(
        f"Batch {batch_number} completed. Total processed: {len(self.processed_items)}/{total_jobs}"
    )

process_item(job_number, item) async

Process a single item asynchronously.

Parameters:

Name Type Description Default
job_number int

The number of the job being processed.

required
item T

The item to process.

required

Returns:

Name Type Description
R R

The result of processing the item.

Source code in src/batch_processors/batchers.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
async def process_item(self, job_number: int, item: T) -> R:
    """
    Process a single item asynchronously.

    Args:
        job_number (int): The number of the job being processed.
        item (T): The item to process.

    Returns:
        R: The result of processing the item.
    """

    async def _process():
        result = await self.process_func(item)
        self.logger.info(f"Processed job {job_number}: {item}")
        return result

    if self.semaphore:
        async with self.semaphore:
            return await _process()
    else:
        return await _process()

process_items_in_batches(input_items) async

Process all input items in batches asynchronously.

Parameters:

Name Type Description Default
input_items Iterable[T]

The items to process.

required

Returns:

Type Description
Tuple[List[T], List[R]]

Tuple[List[T], List[R]]: A tuple containing the list of processed items and their results.

Source code in src/batch_processors/batchers.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
async def process_items_in_batches(
    self, input_items: Iterable[T]
) -> Tuple[List[T], List[R]]:
    """
    Process all input items in batches asynchronously.

    Args:
        input_items (Iterable[T]): The items to process.

    Returns:
        Tuple[List[T], List[R]]: A tuple containing the list of processed items and their results.
    """
    input_items = list(input_items)  # Convert iterable to list
    total_jobs = len(input_items)
    start_index = len(self.processed_items) if self.recover_from_checkpoint else 0

    for i in range(start_index, total_jobs, self.batch_size):
        batch = input_items[i : i + self.batch_size]
        await self.process_batch(batch, i // self.batch_size + 1, total_jobs)

    return self.processed_items, self.results

```