How to reset x in name_function?

The following behavior is interesting.
I collect data coming from futures, and if they reach a certain length, I write out the data to parquet as hive, like this:

        dd.to_parquet(  # type: ignore
            df=dd.from_pandas(df, npartitions=1, sort=False),  # type: ignore
            path=clips_pq,
            compression=clips_compression,  # type: ignore
            schema=clips_schema,  # type: ignore
            partition_on=["ds", "lc", "ver"],
            write_index=False,
            engine="pyarrow",
            append=True,
            ignore_divisions=True,
            name_function=(lambda x: f"batch_{cnt_batch}_part_{x}.parquet"),
        )

I process voice-AI datasets and each process creates a unique set (ds-lc-ver: dataset-language_code-version). I was expecting:

./ds=xx/lc=yy/ver=3/
   batch_0_part_0.parquet
./ds=xx/lc=yy/ver=4/
   batch_0_part_0.parquet
   batch_1_part_0.parquet

But it “remembers” the x in part and gives:

./ds=xx/lc=yy/ver=3/
   batch_0_part_0.parquet
./ds=xx/lc=yy/ver=4/
   batch_0_part_1.parquet
   batch_1_part_2.parquet

Please note, that v3 and v4 are created in independent runs. I did not encountered that behavior when the code was using pyarrow.

How can I reset that value x?

Hi again,

Here also, it would be nice to have some minimal reproducible example, to better understand how you call to_parquet several times.

cc also @martindurant who might have some explanation.

I believe the idea is to be extra certain that none of the partitions clash with existing ones: if you are appending, dask will find the maximum part number across the directories. It does not “inverse” your name_function when this happens; actually I don’t remember how it happens.

It seems you actually want append=False, but that would imply removing the existing files.

The actual code looks like

        part_i = block_index[0]
        filename = (
            f"part.{part_i + self.i_offset}.parquet"
            if self.name_function is None
            else self.name_function(part_i + self.i_offset)
        )

so you can construct your name_function to take this integer and do whatever you like with it. You could maybe have

lambda x: f"batch_{cnt_batch}_part_{x - cnt_batch}.parquet"
2 Likes

Thank you for the answers and thank you for providing the code part.

I had the same impression. I was re-coding my pyarrow based code to dask, so I need to re-thing the naming.

It is not easy to create a minimal example from thousands of lines thou. But this is what I’ve been doing:

  • I have audio datasets, which are versioned and have multiple language files in it.
  • I process each version in a single CLI command, which distributes the languages to processes, so each process handles a single ds-lc-ver.
  • In each process, I scan a tarfile, leave out already existing audio recordings (using an unique integer id field that I read from parquet into a set - so I only deal with delta), the rest is transcoded and audio metadata gets extracted.
    • I collect a list of ClipRec’s in a list. When the length of the list reaches a pre-specified length, I write out that batch, reset the list and go on (this is to keep the file size around a specified size and clear out the memory obviously).

It seems you actually want append=False, but that would imply removing the existing files.

So, I use append=True, and they would never collide in a hive leaf directory. I add data gradually, so older ones should stay :slight_smile: I have controls in place in CLI validation, when somebody tries to re-import the same ds-lc-ver (I check the hive directory).

I wanted the x, for machines with large RAM, where I can collect (say) 100_000 clips and write them in 10_000 “parts”, so x will be 0-9 in this case (but it is not).

As far as I can see, in my scenario, the “x” provided by dask will not be useful, as it increments globally. I think I can leave it as is although it would not carry semantically relevant information.

1 Like

Since you don’t use the file names for anything, why not stay with the current behaviour?

I think I can leave it as is although it would not carry semantically relevant information.

I think that’s what I said :slight_smile:

1 Like

@martindurant, exactly :slight_smile:
I was overcomplicating stuff - in case of dask…
It was needed in pyarrow thou.

1 Like