Producer–Consumer¶
pc – generic producer–consumer¶
pc
module defines necessary classes for producer-consumer
multi-threaded problem.
Usual use case is to create some kind of (thread-safe) generator (usually
shared by py:class:Producer instances). Obtain single value in
Producer.produce()
(obtaining value must be thread-safe as several
producers can obtain value at once). Process the value and return the
result.
Once returned, it can be consumed by Consumer
instances – and
results can be stored in some kind of (thread-safe) result (multiple
consumers at once can store the value at once).
ProducerConsumer
engine manages necessary synchronization among
ProducerThread
and ConsumerThread
threads and
provides produced objects from Producer.produce()
to
Consumer.consume()
.
Caller should ususually:
override
Producer.produce()
override
Consumer.consume()
define some kind of thread-safe generator if required for
Producer
instances to obtain initial objectsdefine some kind of thread-safe result if required for
Consumer
instances to store results
You can look in summer.tests.pctest.ProducerConsumerTest
for
inspiration.
Please look at summer.pcg
module as a specific implementation of
producer–consumer that uses iterable as input, several producers are used
to iterate over it and produce values consumed by several consumers.
Thread count is determind based on target HW (number of cores available + 1). You can try to experiment with this value. For I/O heavy tasks you can get usually better perfomance by increasing number of threads (as a rule of thumb try twice the number of cores).
pcg – producer–consumer with generator¶
pcg
module is more specific producer–consumer implementation based
on common use case: If you need to iterate in parallel over a collection of
input values and invoke an operation for each item.
Typical usage:
class MyConsumer(Consumer):
def __init__(self, progress: Progress):
self.progress = progress
def consume(self, produced_object):
# do whatever is required to do
self...
# indicate progress -- for example to some gui listener (progressbar, ...)
self.progress.next_step()
if __name__ == "__main__":
iterable = list(...)
consumer = MyConsumer(self, progress)
pcg = ProducerConsumerWithGenerator(iterable, ProducerWithGenerator(), consumer)
pcg.run()
Producer is replaced with ProducerWithGenerator
which may be
left as is usually – it automatically iterates over provided iterable
returning one value at a time. You can override
ProducerWithGenerator.produce_from_slice()
method which takes
single argument – the current iterator value.
You can also leverage summer.utils.chunks()
function to split
large collection into smaller ones and produce chunks of data to decrease
race conditions in iteration over single iterator –
summer.pc.Consumer
class consumes the whole chunks, not single
items, which may improve perfomance.