Producer–Consumer

pc – generic producer–consumer

pc module defines necessary classes for producer-consumer multi-threaded problem.

Usual use case is to create some kind of (thread-safe) generator (usually shared by py:class:Producer instances). Obtain single value in Producer.produce() (obtaining value must be thread-safe as several producers can obtain value at once). Process the value and return the result.

Once returned, it can be consumed by Consumer instances – and results can be stored in some kind of (thread-safe) result (multiple consumers at once can store the value at once).

ProducerConsumer engine manages necessary synchronization among ProducerThread and ConsumerThread threads and provides produced objects from Producer.produce() to Consumer.consume().

Caller should ususually:

  1. override Producer.produce()

  2. override Consumer.consume()

  3. define some kind of thread-safe generator if required for Producer instances to obtain initial objects

  4. define some kind of thread-safe result if required for Consumer instances to store results

You can look in summer.tests.pctest.ProducerConsumerTest for inspiration.

Please look at summer.pcg module as a specific implementation of producer–consumer that uses iterable as input, several producers are used to iterate over it and produce values consumed by several consumers.

Thread count is determind based on target HW (number of cores available + 1). You can try to experiment with this value. For I/O heavy tasks you can get usually better perfomance by increasing number of threads (as a rule of thumb try twice the number of cores).

pcg – producer–consumer with generator

pcg module is more specific producer–consumer implementation based on common use case: If you need to iterate in parallel over a collection of input values and invoke an operation for each item.

Typical usage:

class MyConsumer(Consumer):

def __init__(self, progress: Progress):
    self.progress = progress

def consume(self, produced_object):
    # do whatever is required to do
    self...
    # indicate progress -- for example to some gui listener (progressbar, ...)
    self.progress.next_step()

if __name__ == "__main__":
    iterable = list(...)
    consumer = MyConsumer(self, progress)
    pcg = ProducerConsumerWithGenerator(iterable, ProducerWithGenerator(), consumer)
    pcg.run()

Producer is replaced with ProducerWithGenerator which may be left as is usually – it automatically iterates over provided iterable returning one value at a time. You can override ProducerWithGenerator.produce_from_slice() method which takes single argument – the current iterator value.

You can also leverage summer.utils.chunks() function to split large collection into smaller ones and produce chunks of data to decrease race conditions in iteration over single iterator – summer.pc.Consumer class consumes the whole chunks, not single items, which may improve perfomance.