Hi,
I am trying to read a fastavro file using schemaless reader. Schemaless reader reads one record at a time. I am trying to find “Is there a simple way to read records from avro file?” Any pattern would be good.
I tried to replicate what dask read_avro code do but it was unsuccessful.
The avro format is supposed to have a schema ( Apache Avro™ 1.8.1 Documentation ). Do you mean that the schema is included and separate for each record? How would you read them without dask?
Thanks for responding.
using fastavro, file can be written with schemaless writer and then can read using schemaless reader by providing schema separately.
https://fastavro.readthedocs.io/en/latest/writer.html#fastavro._write_py.schemaless_writer
https://fastavro.readthedocs.io/en/latest/reader.html#fastavro._read_py.schemaless_reader
However with schemaless reader, only one record be read at once. Here is a sample code
with urllib.request.urlopen("avro_file") as fp:
while True:
try:
record = fastavro.schemaless_reader(fp, schema, return_record_name=False)
records.append(record)
except StopIteration:
break
Currently I am reading an avro file with schemaless reader and then running some processing out of it. I am trying to see if there any benefit by converting records into bag or something similiar to read_avro
method provided in core libs
Ah, so it’s not schemaless (despite the name), but “bring your own schema” 
dask.bag
is convenient if you have functional-style processing (like itertools), and your snippet could easily be made into a a generator, where each file is one batch. So yes, bag
is a good model.
However, if you want dask’s parallelism and out-of-core, it’s also fine to have a simple delayed
function that does everything you mean to do to a file’s worth of data, and make a delayed call for each file in your list. Dask doesn’t need to know about the details of your processing in that case, it just calls your function.
Thanks Martin,
That is exactly what I am trying to do. I am following this pattern as per website
@dask.delayed
def load(filename):
with urllib.request.urlopen(filename) as fp:
while True:
try:
record = fastavro.schemaless_reader(fp, schema, return_record_name=False)
records.append(record)
except StopIteration:
break
return records
@dask.delayed
def process(records ):
.. some processing
... CPU bound work no IO
def f(filenames):
results = []
for filename in filenames:
records = load(filename)
result= process(records )
results.append(result)
return results
dask.compute(f(filenames))
My first question is if above code is good enough. Will bag help in this scenario?
Also load part is taking a lots of CPU upto 100% of workers which is not a problem but just want to understand why that would be the case
If you can do everything you want to do in process
, this seems fine to me.
Also load part is taking a lots of CPU upto 100% of workers which is not a problem but just want to understand why that would be the case
fastavro isn’t all that fast…
1 Like
Awkward-array now supports avro: ak.from_avro_file — Awkward Array 2.5.0 documentation ; should be significantly faster when you have a consistent schema, but I don’t know if it supports “schemaless”. dask-awkward does not yet cover avro, so you would still be using delayed.
1 Like