How can I continuously process results from a single order stream?

I have a continuous flow of data (like analog data) that’s coming in.

  1. I’d like to record it sequentially.
  2. While it’s recording, I’d like to process it on the side without blocking the recording and concat the result to an existing string.

I think it should be straightforward, but I can’t seem to figure it out.

import time
import random


def continuous_recording_stream():
    # must be done in sequential order so no client.submit
    x = random.randint(1, 10)
    y = random.randint(1, 10)
    time.sleep((x + y) / 2)
    return x + y


def process_output(result):
    time.sleep(result)
    print(f"Result: {result}")
    return f"{result}"


def main():
    results = ""
    while True:
        result = continuous_recording_stream()
        results += process_output(result)
        print(f"Results: {results}")

Apparently articulating the problem works; pasted my question to Claude, with a few revisions, and was able to get the following

import time
import random
from dask.distributed import Client
from collections import deque

def continuous_recording_stream():
    x = random.randint(1, 10)
    y = random.randint(1, 10)
    print(f"received", x, y, f"output {x + y}")
    time.sleep(1)
    return x + y

def process_output(result):
    time.sleep(result / 3)
    return f"{result}"

def main():
    client = Client()
    display(client)
    results = ""
    queue = deque()

    try:
        while True:
            result = continuous_recording_stream()
            
            # Submit new task to the client and add future to the queue
            future = client.submit(process_output, result)
            queue.append(future)

            # Process all ready tasks
            while queue and queue[0].status == 'finished':
                results += f"-{queue.popleft().result()}"
            
            print(f"Results: {results}")
    
    except KeyboardInterrupt:
        print("Stopping the process...")
    finally:
        # Process any remaining tasks
        while queue:
            results += f"-{queue.popleft().result()}"
        print(f"Final Results: {results}")
        client.close()

if __name__ == "__main__":
    main()
1 Like