Apparently articulating the problem works; pasted my question to Claude, with a few revisions, and was able to get the following
import time
import random
from dask.distributed import Client
from collections import deque
def continuous_recording_stream():
x = random.randint(1, 10)
y = random.randint(1, 10)
print(f"received", x, y, f"output {x + y}")
time.sleep(1)
return x + y
def process_output(result):
time.sleep(result / 3)
return f"{result}"
def main():
client = Client()
display(client)
results = ""
queue = deque()
try:
while True:
result = continuous_recording_stream()
# Submit new task to the client and add future to the queue
future = client.submit(process_output, result)
queue.append(future)
# Process all ready tasks
while queue and queue[0].status == 'finished':
results += f"-{queue.popleft().result()}"
print(f"Results: {results}")
except KeyboardInterrupt:
print("Stopping the process...")
finally:
# Process any remaining tasks
while queue:
results += f"-{queue.popleft().result()}"
print(f"Final Results: {results}")
client.close()
if __name__ == "__main__":
main()