I have a continuous flow of data (like analog data) that’s coming in.
- I’d like to record it sequentially.
- While it’s recording, I’d like to process it on the side without blocking the recording and concat the result to an existing string.
I think it should be straightforward, but I can’t seem to figure it out.
import time
import random
def continuous_recording_stream():
# must be done in sequential order so no client.submit
x = random.randint(1, 10)
y = random.randint(1, 10)
time.sleep((x + y) / 2)
return x + y
def process_output(result):
time.sleep(result)
print(f"Result: {result}")
return f"{result}"
def main():
results = ""
while True:
result = continuous_recording_stream()
results += process_output(result)
print(f"Results: {results}")
Apparently articulating the problem works; pasted my question to Claude, with a few revisions, and was able to get the following
import time
import random
from dask.distributed import Client
from collections import deque
def continuous_recording_stream():
x = random.randint(1, 10)
y = random.randint(1, 10)
print(f"received", x, y, f"output {x + y}")
time.sleep(1)
return x + y
def process_output(result):
time.sleep(result / 3)
return f"{result}"
def main():
client = Client()
display(client)
results = ""
queue = deque()
try:
while True:
result = continuous_recording_stream()
# Submit new task to the client and add future to the queue
future = client.submit(process_output, result)
queue.append(future)
# Process all ready tasks
while queue and queue[0].status == 'finished':
results += f"-{queue.popleft().result()}"
print(f"Results: {results}")
except KeyboardInterrupt:
print("Stopping the process...")
finally:
# Process any remaining tasks
while queue:
results += f"-{queue.popleft().result()}"
print(f"Final Results: {results}")
client.close()
if __name__ == "__main__":
main()
1 Like