All thread receive same results from function

Imagine you want to broadcast the data generate with a generator. There are so many consumer which receive the data and consume it. Generator will generate the data regarding if there is any consumer or not. Every consumer which join can receive the data from the moment which joined. The best solution is to use observer pattern, but what if the number of consumer are so high and the rate of generating the information is also so high. In this case when the data 1 is available and you are broadcasting the data to all observers, the data 2 is ready and you didn’t finish broadcasting the data 1 yet. I think about is there any solution with this problem or not? One thing that I think about is: Is it possible some thread execute a same function in a shared space? In other word, is it possible to run function once and all the thread get the result all together.

Look at this code:

class data_provider():

    def __init__(self) -> None:
        self.cnt = 0
        self.stop = False
        self.data_received = Event()

        self.data_received.clear()

        data_generator_thread = threading.Thread(target= self.data_generator)
        data_generator_thread.start()

    def data_sender(self):

        self.data_received.wait()
        
        while not self.stop:
            yield { 'Name': 'Data', 'Count': self.cnt}
            self.data_received.clear()

            self.data_received.wait()



    def data_generator(self):
        while(self.cnt < 20):
            self.cnt += 1
            self.data_received.set()
            sleep(1)

        self.stop = True
        self.data_received.set()

and main.py

obj = data_provider()

def printing(id):
    for data in obj.data_sender():
        print(str(id) + ' : ' + str(data))

our_threads = []

for i in range(100):
    our_threads.append(Thread(target= printing, args=(i,)))
    our_threads[i].start()

for i in range(100):
    our_threads[i].join()

print('End')

If all the thread run the data_sender() once and all get the same result as soon as the other thread receive the information.

Any idea?

  • Give each consumer a queue it reads from, and have the producer write into each queue?

    – 

  • Is it necessary that all consumers receive all data or can some data be thrown away to stay up to date with newest data?

    – 




  • It is necessary for all consumer receive all data from the moment they joined.

    – 

  • 2

    Well, another option is a single circular buffer into which the producer writes and everyone else reads from.

    – 

  • 1

    A linked list may be useful. The producer holds a reference to the last node of the list, each consumer to the last node itself has processed, no reference to prior nodes is hold. The producer uses a Condition to notify_all when new data was appended. Each consumer can follow the linked list independently, old, unused nodes are removed by garbage collection.

    – 

If you are looking to have a single producer of some data and multiple “consumers” of that data, then the following code uses Condition instances with notification. For demo purposes the producer only produces 10 pieces of data and we have 3 consumers. This will keep the output to a reasonable length:

Update Using Linked List

import threading

class Node:
    def __init__(self, data, previous_node):
        self.consumed_count = 0
        self.next = None
        self.data = data
        if previous_node:
            previous_node.next = self

    def __repr__(self):
        return f'[data: {repr(self.data)}, consumed_count: {self.consumed_count}, next: {self.next}]'

class data_provider:
    def __init__(self, num_consumers) -> None:
        self.num_consumers = num_consumers
        self.lock = threading.Lock()
        self.condition = threading.Condition()
        self.running = True
        # To simplify the code, the first node in the list is a dummy:
        self.linked_list = Node(None, None)

        data_generator_thread = threading.Thread(target=self.data_generator)
        data_generator_thread.start()

    def data_generator(self):
        import time

        last_node = self.linked_list

        for cnt in range(1, 11) :  # Reduced count for demo purposes
            # For demo purposes let's introduce a pause:
            time.sleep(.5)
            last_node = Node({'Name': 'Data', 'Count': cnt}, last_node)
            with self.condition:
                self.condition.notify_all()

        print('Done producing')
        # Let consumers know that no more data will be coming:
        with self.condition:
            self.running = False
            self.condition.notify_all()

    def remove_consumed_nodes(self):
        with self.lock:
            # Remove completely consumed links except for the last one:
            prev_node = self.linked_list.next
            node = prev_node.next
            while node and node.consumed_count == self.num_consumers:
                prev_node = node
                node = node.next
            self.linked_list.next = prev_node


N_PRINTERS = 3  # The number of printer threads:

obj = data_provider(N_PRINTERS)

def printing(id):
    last_node = obj.linked_list

    while True:
        with obj.condition:
            obj.condition.wait_for(
                lambda: not obj.running or last_node.next
            )

        if not last_node.next:
            return

        last_node = last_node.next
        while True:
            print(id, ':', last_node.data)
            with obj.lock:
                last_node.consumed_count += 1
            if not last_node.next:
                break
            last_node = last_node.next

        obj.remove_consumed_nodes()


printer_threads = []

for i in range(N_PRINTERS):
    thread = threading.Thread(target=printing, args=(i,))
    thread.start()
    printer_threads.append(thread)

for thread in printer_threads:
    thread.join()

print('End')
print(obj.linked_list)

Prints:

1 : {'Name': 'Data', 'Count': 1}
0 : {'Name': 'Data', 'Count': 1}
2 : {'Name': 'Data', 'Count': 1}
0 : {'Name': 'Data', 'Count': 2}
1 : {'Name': 'Data', 'Count': 2}
2 : {'Name': 'Data', 'Count': 2}
0 : {'Name': 'Data', 'Count': 3}
2 : {'Name': 'Data', 'Count': 3}
1 : {'Name': 'Data', 'Count': 3}
0 : {'Name': 'Data', 'Count': 4}
1 : {'Name': 'Data', 'Count': 4}
2 : {'Name': 'Data', 'Count': 4}
0 : {'Name': 'Data', 'Count': 5}
1 : {'Name': 'Data', 'Count': 5}
2 : {'Name': 'Data', 'Count': 5}
0 : {'Name': 'Data', 'Count': 6}
2 : {'Name': 'Data', 'Count': 6}
1 : {'Name': 'Data', 'Count': 6}
0 : {'Name': 'Data', 'Count': 7}
1 : {'Name': 'Data', 'Count': 7}
2 : {'Name': 'Data', 'Count': 7}
2 : {'Name': 'Data', 'Count': 8}
0 : {'Name': 'Data', 'Count': 8}
1 : {'Name': 'Data', 'Count': 8}
2 : {'Name': 'Data', 'Count': 9}
0 : {'Name': 'Data', 'Count': 9}
1 : {'Name': 'Data', 'Count': 9}
Done producing
2 : {'Name': 'Data', 'Count': 10}
0 : {'Name': 'Data', 'Count': 10}
1 : {'Name': 'Data', 'Count': 10}
End
[data: None, consumed_count: 0, next: [data: {'Name': 'Data', 'Count': 10}, consumed_count: 3, next: None]]

Reusable MultiConsumerProducer Class

The above code can be re-engineered for improved reusability.

import threading
from typing import Iterable, List, Any

class MultiConsumerProducer:
    class Node:
        def __init__(self, data: Any, previous_node: 'Node'):
            self._consumed_count = 0
            self._next = None
            self._data = data
            if previous_node:
                previous_node._next = self

        @property
        def data(self) -> Any:
            return self._data

        def __repr__(self):
            return f'[_data: {repr(self._data)}, _consumed_count: {self._consumed_count}, _next: {self._next}]'

    def __init__(self, num_consumers: int, data_collection: Iterable) -> None:
        self._num_consumers = num_consumers
        self._lock = threading.Lock()
        self._condition = threading.Condition()
        self._running = True
        # To simplify the code, the first node in the list is a dummy:
        self._linked_list = MultiConsumerProducer.Node(None, None)

        threading.Thread(target=self._data_generator, args=(data_collection,), daemon=True).start()

    def print_nodes(self) -> None:
        """Print linked list of nodes."""

        print(self._linked_list)

    def _data_generator(self, data_collection):
        """Generate nodes."""

        last_node = self._linked_list
        for data in data_collection:
            last_node = MultiConsumerProducer.Node(data, last_node)
            with self._condition:
                self._condition.notify_all()

        self._running = False
        with self._condition:
            self._condition.notify_all()

    def get_next_nodes(self, last_node_processed: Node=None) -> List[Node]:
        """Get next list of ready nodes."""

        last_node = last_node_processed or self._linked_list

        with self._condition:
            self._condition.wait_for(
                lambda: not self._running or last_node._next
            )

        if not last_node._next:
            return []

        nodes = []
        last_node = last_node._next
        while True:
            nodes.append(last_node)
            if not last_node._next:
                return nodes
            last_node = last_node._next

    def consumed_node(self, node: Node) -> None:
        """Show node has been consumed."""

        with self._lock:
            node._consumed_count += 1

            if node._consumed_count == self._num_consumers:
                # Remove completely consumed links except for the last one:
                prev_node = self._linked_list._next
                node = prev_node._next
                while node and node._consumed_count == self._num_consumers:
                    prev_node = node
                    node = node._next
                self._linked_list._next = prev_node

##############################################################

def producer():
    import time

    for cnt in range(1, 11) :  # Reduced count for demo purposes
        # For demo purposes let's introduce a pause:
        time.sleep(.5)
        yield {'Name': 'Data', 'Count': cnt}
    print('Done producing')


N_PRINTERS = 3  # The number of printer threads:

obj = MultiConsumerProducer(N_PRINTERS, producer())

def printing(id):
    last_node_processed = None

    while (nodes := obj.get_next_nodes(last_node_processed)):
        for last_node_processed in nodes:
            print(id, ':', last_node_processed.data)
            obj.consumed_node(last_node_processed)

printer_threads = []

for i in range(N_PRINTERS):
    thread = threading.Thread(target=printing, args=(i,))
    thread.start()
    printer_threads.append(thread)

for thread in printer_threads:
    thread.join()

print('End')

print('\nNodes:')
obj.print_nodes()

One More Time

The following function generates an abstract base class that uses queues for delivering work and supports producer/consumers running in either threads or processes

def generate_multi_consumer_producer(n_consumers, use_multiprocessing: bool=False, queue_size=0):
    """Generate an abstract base for single producer multiple consumers.
        n_consumers: The number of consumers.
        use_multiprocessing: True to use producer/consumers that run in child processes
                             otherwise child threads are used.
        queue_size: If producing is faster than consumption, you can specify a
                    a positive value for queue_size to prevent the queues from conyinuously
                    growing."""

    from abc import ABC, abstractmethod
    from typing import List, Iterable

    if use_multiprocessing:
        from multiprocessing import Process as ExecutionClass, JoinableQueue as QueueClass
    else:
        from threading import Thread as ExecutionClass
        from queue import Queue as QueueClass

    class MultiConsumerProducer(ABC):
        def __init__(self, n_consumers: int=n_consumers, queue_size=queue_size) -> None:
            self._n_consumers = n_consumers
            self._queues = [QueueClass(queue_size) for _ in range(n_consumers)]
            self._producer = ExecutionClass(target=self._run)
            self._producer.start()

        def _run(self):
            # Start the consumers:
            for consumer_id in range(self._n_consumers):
                ExecutionClass(
                    target=self._consumer,
                    args=(consumer_id, self._queues[consumer_id]),
                    daemon=True
                ).start()

            # Produce the data
            for data in self.produce():
                for queue in self._queues:
                    queue.put(data)

            # Wait for all work to be completed
            for queue in self._queues:
                queue.join()

        def join(self) -> None:
            """Wait for all work to complete."""

            self._producer.join()

        def _consumer(self, consumer_id: int, queue: QueueClass):
            while True:
                data = queue.get()
                try:
                    self.consume(consumer_id, data)
                except Exception as e:
                    print(f'Exception in consumer {consumer_id}: {e}')
                finally:
                    queue.task_done()

        @abstractmethod
        def produce(self):
            """This should be a generator function."""
            pass

        @abstractmethod
        def consume(self, consumer_id: int, data: object) -> None:
            pass

    return MultiConsumerProducer

The following is an example usage where I have 3 existing consumer functions and an existing producer function showing how you could use them without modification:

def consumer_0(consumer_id, n):
    print(f'id {consumer_id}: {n} ** 1 = {n}')

def consumer_1(consumer_id, n):
    print(f'id {consumer_id}: {n} ** 2 = {n ** 2}')

def consumer_2(consumer_id, n):
    print(f'id {consumer_id}: {n} ** 3 = {n ** 3}')

def producer():
    import time

    for n in range(1, 11) :  # Reduced count for demo purposes
        # For demo purposes let's introduce a pause:
        time.sleep(.5)
        yield n
    print('Done producing', flush=True)

MultiConsumerProducer = generate_multi_consumer_producer(3, use_multiprocessing=True)

class MyMultiConsumerProducer(MultiConsumerProducer):
    """An example that uses 3 different consumers."""
    consumers = [consumer_0, consumer_1, consumer_2]

    def produce(self):
        yield from producer()

    def consume(self, consumer_id, data):
        self.consumers[consumer_id](consumer_id, data)

if __name__ == '__main__':
    p = MyMultiConsumerProducer(3)
    # Wait for all work to complete:
    p.join()

Prints:

id 0: 1 ** 1 = 1
id 1: 1 ** 2 = 1
id 2: 1 ** 3 = 1
id 0: 2 ** 1 = 2
id 2: 2 ** 3 = 8
id 1: 2 ** 2 = 4
id 0: 3 ** 1 = 3
id 2: 3 ** 3 = 27
id 1: 3 ** 2 = 9
id 2: 4 ** 3 = 64
id 1: 4 ** 2 = 16
id 0: 4 ** 1 = 4
id 0: 5 ** 1 = 5
id 1: 5 ** 2 = 25
id 2: 5 ** 3 = 125
id 0: 6 ** 1 = 6
id 1: 6 ** 2 = 36
id 2: 6 ** 3 = 216
id 0: 7 ** 1 = 7
id 1: 7 ** 2 = 49
id 2: 7 ** 3 = 343
id 0: 8 ** 1 = 8
id 1: 8 ** 2 = 64
id 2: 8 ** 3 = 512
id 0: 9 ** 1 = 9
id 2: 9 ** 3 = 729
id 1: 9 ** 2 = 81
Done producing
id 2: 10 ** 3 = 1000
id 1: 10 ** 2 = 100
id 0: 10 ** 1 = 10

Leave a Comment