How to read avro file with schemaless reader?

Hi,

I am trying to read a fastavro file using schemaless reader. Schemaless reader reads one record at a time. I am trying to find “Is there a simple way to read records from avro file?” Any pattern would be good.

I tried to replicate what dask read_avro code do but it was unsuccessful.

The avro format is supposed to have a schema ( Apache Avro™ 1.8.1 Documentation ). Do you mean that the schema is included and separate for each record? How would you read them without dask?

Thanks for responding.

using fastavro, file can be written with schemaless writer and then can read using schemaless reader by providing schema separately.

https://fastavro.readthedocs.io/en/latest/writer.html#fastavro._write_py.schemaless_writer
https://fastavro.readthedocs.io/en/latest/reader.html#fastavro._read_py.schemaless_reader

However with schemaless reader, only one record be read at once. Here is a sample code

        with urllib.request.urlopen("avro_file") as fp:
            while True:
                try:
                    record = fastavro.schemaless_reader(fp, schema, return_record_name=False)
                    records.append(record)
                except StopIteration:
                    break

Currently I am reading an avro file with schemaless reader and then running some processing out of it. I am trying to see if there any benefit by converting records into bag or something similiar to read_avro method provided in core libs

Ah, so it’s not schemaless (despite the name), but “bring your own schema” :slight_smile:

dask.bag is convenient if you have functional-style processing (like itertools), and your snippet could easily be made into a a generator, where each file is one batch. So yes, bag is a good model.

However, if you want dask’s parallelism and out-of-core, it’s also fine to have a simple delayed function that does everything you mean to do to a file’s worth of data, and make a delayed call for each file in your list. Dask doesn’t need to know about the details of your processing in that case, it just calls your function.

Thanks Martin,

That is exactly what I am trying to do. I am following this pattern as per website

@dask.delayed
def load(filename):
      with urllib.request.urlopen(filename) as fp:
            while True:
                try:
                    record = fastavro.schemaless_reader(fp, schema, return_record_name=False)
                    records.append(record)
                except StopIteration:
                    break
      return records 

@dask.delayed
def process(records ):
     .. some processing  
    ... CPU bound work no IO




def f(filenames):
    results = []
    for filename in filenames:
        records = load(filename)
        result= process(records )
        results.append(result)

    return results

dask.compute(f(filenames))

My first question is if above code is good enough. Will bag help in this scenario?

Also load part is taking a lots of CPU upto 100% of workers which is not a problem but just want to understand why that would be the case

If you can do everything you want to do in process, this seems fine to me.

Also load part is taking a lots of CPU upto 100% of workers which is not a problem but just want to understand why that would be the case

fastavro isn’t all that fast…

1 Like

Thanks a lot Martin!!

Awkward-array now supports avro: ak.from_avro_file — Awkward Array 2.5.0 documentation ; should be significantly faster when you have a consistent schema, but I don’t know if it supports “schemaless”. dask-awkward does not yet cover avro, so you would still be using delayed.

1 Like