Imagine you want to broadcast the data generate with a generator. There are so many consumer which receive the data and consume it. Generator will generate the data regarding if there is any consumer or not. Every consumer which join can receive the data from the moment which joined. The best solution is to use observer pattern, but what if the number of consumer are so high and the rate of generating the information is also so high. In this case when the data 1 is available and you are broadcasting the data to all observers, the data 2 is ready and you didn’t finish broadcasting the data 1 yet. I think about is there any solution with this problem or not? One thing that I think about is: Is it possible some thread execute a same function in a shared space? In other word, is it possible to run function once and all the thread get the result all together.
Look at this code:
class data_provider():
def __init__(self) -> None:
self.cnt = 0
self.stop = False
self.data_received = Event()
self.data_received.clear()
data_generator_thread = threading.Thread(target= self.data_generator)
data_generator_thread.start()
def data_sender(self):
self.data_received.wait()
while not self.stop:
yield { 'Name': 'Data', 'Count': self.cnt}
self.data_received.clear()
self.data_received.wait()
def data_generator(self):
while(self.cnt < 20):
self.cnt += 1
self.data_received.set()
sleep(1)
self.stop = True
self.data_received.set()
and main.py
obj = data_provider()
def printing(id):
for data in obj.data_sender():
print(str(id) + ' : ' + str(data))
our_threads = []
for i in range(100):
our_threads.append(Thread(target= printing, args=(i,)))
our_threads[i].start()
for i in range(100):
our_threads[i].join()
print('End')
If all the thread run the data_sender() once and all get the same result as soon as the other thread receive the information.
Any idea?
If you are looking to have a single producer of some data and multiple “consumers” of that data, then the following code uses Condition
instances with notification. For demo purposes the producer only produces 10 pieces of data and we have 3 consumers. This will keep the output to a reasonable length:
Update Using Linked List
import threading
class Node:
def __init__(self, data, previous_node):
self.consumed_count = 0
self.next = None
self.data = data
if previous_node:
previous_node.next = self
def __repr__(self):
return f'[data: {repr(self.data)}, consumed_count: {self.consumed_count}, next: {self.next}]'
class data_provider:
def __init__(self, num_consumers) -> None:
self.num_consumers = num_consumers
self.lock = threading.Lock()
self.condition = threading.Condition()
self.running = True
# To simplify the code, the first node in the list is a dummy:
self.linked_list = Node(None, None)
data_generator_thread = threading.Thread(target=self.data_generator)
data_generator_thread.start()
def data_generator(self):
import time
last_node = self.linked_list
for cnt in range(1, 11) : # Reduced count for demo purposes
# For demo purposes let's introduce a pause:
time.sleep(.5)
last_node = Node({'Name': 'Data', 'Count': cnt}, last_node)
with self.condition:
self.condition.notify_all()
print('Done producing')
# Let consumers know that no more data will be coming:
with self.condition:
self.running = False
self.condition.notify_all()
def remove_consumed_nodes(self):
with self.lock:
# Remove completely consumed links except for the last one:
prev_node = self.linked_list.next
node = prev_node.next
while node and node.consumed_count == self.num_consumers:
prev_node = node
node = node.next
self.linked_list.next = prev_node
N_PRINTERS = 3 # The number of printer threads:
obj = data_provider(N_PRINTERS)
def printing(id):
last_node = obj.linked_list
while True:
with obj.condition:
obj.condition.wait_for(
lambda: not obj.running or last_node.next
)
if not last_node.next:
return
last_node = last_node.next
while True:
print(id, ':', last_node.data)
with obj.lock:
last_node.consumed_count += 1
if not last_node.next:
break
last_node = last_node.next
obj.remove_consumed_nodes()
printer_threads = []
for i in range(N_PRINTERS):
thread = threading.Thread(target=printing, args=(i,))
thread.start()
printer_threads.append(thread)
for thread in printer_threads:
thread.join()
print('End')
print(obj.linked_list)
Prints:
1 : {'Name': 'Data', 'Count': 1}
0 : {'Name': 'Data', 'Count': 1}
2 : {'Name': 'Data', 'Count': 1}
0 : {'Name': 'Data', 'Count': 2}
1 : {'Name': 'Data', 'Count': 2}
2 : {'Name': 'Data', 'Count': 2}
0 : {'Name': 'Data', 'Count': 3}
2 : {'Name': 'Data', 'Count': 3}
1 : {'Name': 'Data', 'Count': 3}
0 : {'Name': 'Data', 'Count': 4}
1 : {'Name': 'Data', 'Count': 4}
2 : {'Name': 'Data', 'Count': 4}
0 : {'Name': 'Data', 'Count': 5}
1 : {'Name': 'Data', 'Count': 5}
2 : {'Name': 'Data', 'Count': 5}
0 : {'Name': 'Data', 'Count': 6}
2 : {'Name': 'Data', 'Count': 6}
1 : {'Name': 'Data', 'Count': 6}
0 : {'Name': 'Data', 'Count': 7}
1 : {'Name': 'Data', 'Count': 7}
2 : {'Name': 'Data', 'Count': 7}
2 : {'Name': 'Data', 'Count': 8}
0 : {'Name': 'Data', 'Count': 8}
1 : {'Name': 'Data', 'Count': 8}
2 : {'Name': 'Data', 'Count': 9}
0 : {'Name': 'Data', 'Count': 9}
1 : {'Name': 'Data', 'Count': 9}
Done producing
2 : {'Name': 'Data', 'Count': 10}
0 : {'Name': 'Data', 'Count': 10}
1 : {'Name': 'Data', 'Count': 10}
End
[data: None, consumed_count: 0, next: [data: {'Name': 'Data', 'Count': 10}, consumed_count: 3, next: None]]
Reusable MultiConsumerProducer Class
The above code can be re-engineered for improved reusability.
import threading
from typing import Iterable, List, Any
class MultiConsumerProducer:
class Node:
def __init__(self, data: Any, previous_node: 'Node'):
self._consumed_count = 0
self._next = None
self._data = data
if previous_node:
previous_node._next = self
@property
def data(self) -> Any:
return self._data
def __repr__(self):
return f'[_data: {repr(self._data)}, _consumed_count: {self._consumed_count}, _next: {self._next}]'
def __init__(self, num_consumers: int, data_collection: Iterable) -> None:
self._num_consumers = num_consumers
self._lock = threading.Lock()
self._condition = threading.Condition()
self._running = True
# To simplify the code, the first node in the list is a dummy:
self._linked_list = MultiConsumerProducer.Node(None, None)
threading.Thread(target=self._data_generator, args=(data_collection,), daemon=True).start()
def print_nodes(self) -> None:
"""Print linked list of nodes."""
print(self._linked_list)
def _data_generator(self, data_collection):
"""Generate nodes."""
last_node = self._linked_list
for data in data_collection:
last_node = MultiConsumerProducer.Node(data, last_node)
with self._condition:
self._condition.notify_all()
self._running = False
with self._condition:
self._condition.notify_all()
def get_next_nodes(self, last_node_processed: Node=None) -> List[Node]:
"""Get next list of ready nodes."""
last_node = last_node_processed or self._linked_list
with self._condition:
self._condition.wait_for(
lambda: not self._running or last_node._next
)
if not last_node._next:
return []
nodes = []
last_node = last_node._next
while True:
nodes.append(last_node)
if not last_node._next:
return nodes
last_node = last_node._next
def consumed_node(self, node: Node) -> None:
"""Show node has been consumed."""
with self._lock:
node._consumed_count += 1
if node._consumed_count == self._num_consumers:
# Remove completely consumed links except for the last one:
prev_node = self._linked_list._next
node = prev_node._next
while node and node._consumed_count == self._num_consumers:
prev_node = node
node = node._next
self._linked_list._next = prev_node
##############################################################
def producer():
import time
for cnt in range(1, 11) : # Reduced count for demo purposes
# For demo purposes let's introduce a pause:
time.sleep(.5)
yield {'Name': 'Data', 'Count': cnt}
print('Done producing')
N_PRINTERS = 3 # The number of printer threads:
obj = MultiConsumerProducer(N_PRINTERS, producer())
def printing(id):
last_node_processed = None
while (nodes := obj.get_next_nodes(last_node_processed)):
for last_node_processed in nodes:
print(id, ':', last_node_processed.data)
obj.consumed_node(last_node_processed)
printer_threads = []
for i in range(N_PRINTERS):
thread = threading.Thread(target=printing, args=(i,))
thread.start()
printer_threads.append(thread)
for thread in printer_threads:
thread.join()
print('End')
print('\nNodes:')
obj.print_nodes()
One More Time
The following function generates an abstract base class that uses queues for delivering work and supports producer/consumers running in either threads or processes
def generate_multi_consumer_producer(n_consumers, use_multiprocessing: bool=False, queue_size=0):
"""Generate an abstract base for single producer multiple consumers.
n_consumers: The number of consumers.
use_multiprocessing: True to use producer/consumers that run in child processes
otherwise child threads are used.
queue_size: If producing is faster than consumption, you can specify a
a positive value for queue_size to prevent the queues from conyinuously
growing."""
from abc import ABC, abstractmethod
from typing import List, Iterable
if use_multiprocessing:
from multiprocessing import Process as ExecutionClass, JoinableQueue as QueueClass
else:
from threading import Thread as ExecutionClass
from queue import Queue as QueueClass
class MultiConsumerProducer(ABC):
def __init__(self, n_consumers: int=n_consumers, queue_size=queue_size) -> None:
self._n_consumers = n_consumers
self._queues = [QueueClass(queue_size) for _ in range(n_consumers)]
self._producer = ExecutionClass(target=self._run)
self._producer.start()
def _run(self):
# Start the consumers:
for consumer_id in range(self._n_consumers):
ExecutionClass(
target=self._consumer,
args=(consumer_id, self._queues[consumer_id]),
daemon=True
).start()
# Produce the data
for data in self.produce():
for queue in self._queues:
queue.put(data)
# Wait for all work to be completed
for queue in self._queues:
queue.join()
def join(self) -> None:
"""Wait for all work to complete."""
self._producer.join()
def _consumer(self, consumer_id: int, queue: QueueClass):
while True:
data = queue.get()
try:
self.consume(consumer_id, data)
except Exception as e:
print(f'Exception in consumer {consumer_id}: {e}')
finally:
queue.task_done()
@abstractmethod
def produce(self):
"""This should be a generator function."""
pass
@abstractmethod
def consume(self, consumer_id: int, data: object) -> None:
pass
return MultiConsumerProducer
The following is an example usage where I have 3 existing consumer functions and an existing producer function showing how you could use them without modification:
def consumer_0(consumer_id, n):
print(f'id {consumer_id}: {n} ** 1 = {n}')
def consumer_1(consumer_id, n):
print(f'id {consumer_id}: {n} ** 2 = {n ** 2}')
def consumer_2(consumer_id, n):
print(f'id {consumer_id}: {n} ** 3 = {n ** 3}')
def producer():
import time
for n in range(1, 11) : # Reduced count for demo purposes
# For demo purposes let's introduce a pause:
time.sleep(.5)
yield n
print('Done producing', flush=True)
MultiConsumerProducer = generate_multi_consumer_producer(3, use_multiprocessing=True)
class MyMultiConsumerProducer(MultiConsumerProducer):
"""An example that uses 3 different consumers."""
consumers = [consumer_0, consumer_1, consumer_2]
def produce(self):
yield from producer()
def consume(self, consumer_id, data):
self.consumers[consumer_id](consumer_id, data)
if __name__ == '__main__':
p = MyMultiConsumerProducer(3)
# Wait for all work to complete:
p.join()
Prints:
id 0: 1 ** 1 = 1
id 1: 1 ** 2 = 1
id 2: 1 ** 3 = 1
id 0: 2 ** 1 = 2
id 2: 2 ** 3 = 8
id 1: 2 ** 2 = 4
id 0: 3 ** 1 = 3
id 2: 3 ** 3 = 27
id 1: 3 ** 2 = 9
id 2: 4 ** 3 = 64
id 1: 4 ** 2 = 16
id 0: 4 ** 1 = 4
id 0: 5 ** 1 = 5
id 1: 5 ** 2 = 25
id 2: 5 ** 3 = 125
id 0: 6 ** 1 = 6
id 1: 6 ** 2 = 36
id 2: 6 ** 3 = 216
id 0: 7 ** 1 = 7
id 1: 7 ** 2 = 49
id 2: 7 ** 3 = 343
id 0: 8 ** 1 = 8
id 1: 8 ** 2 = 64
id 2: 8 ** 3 = 512
id 0: 9 ** 1 = 9
id 2: 9 ** 3 = 729
id 1: 9 ** 2 = 81
Done producing
id 2: 10 ** 3 = 1000
id 1: 10 ** 2 = 100
id 0: 10 ** 1 = 10
Give each consumer a queue it reads from, and have the producer write into each queue?
Is it necessary that all consumers receive all data or can some data be thrown away to stay up to date with newest data?
It is necessary for all consumer receive all data from the moment they joined.
Well, another option is a single circular buffer into which the producer writes and everyone else reads from.
A linked list may be useful. The producer holds a reference to the last node of the list, each consumer to the last node itself has processed, no reference to prior nodes is hold. The producer uses a
Condition
tonotify_all
when new data was appended. Each consumer can follow the linked list independently, old, unused nodes are removed by garbage collection.Show 2 more comments