Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve reader options #81

Open
kylebarron opened this issue Apr 9, 2022 · 6 comments
Open

Improve reader options #81

kylebarron opened this issue Apr 9, 2022 · 6 comments

Comments

@kylebarron
Copy link
Owner

No description provided.

@H-Plus-Time
Copy link
Contributor

I've had a bit of a poke around in the course of the streams/object store experiments with this, works pretty well so far (at the moment I've got a json object producing schema repr (some kind of schema summary felt necessary vs just guessing column names), and a Vec<String> -> ProjectionMask, plugged into ParquetRecordBatchStreamBuilder's with_projection).

So, design questions/opinions:

  1. At the moment, I'm silently ignoring any input columns that just don't exist - options:
    a. warn, display a succinct repr of the schema and move on, throw if there's absolutely no valid input columns
    b. throw if any of them are invalid (with the schema and a bit of highlighting)
  2. rich reprs for the schema - flipping the serde feature of arrow_schema seems to be enough.
  3. Nested schemas - these work perfectly with dot separators (they're embedded in the ColumDescriptor), though a bit of heads up documentation would probably be warranted (namely that selecting subfields just drops siblings from the parent type, so a table consisting of a struct column foo { a, b, c} after filtering with ['foo.a'] results in foo { a } not a)
    The api looks like this:
let instance = await new AsyncParquetFile("<targetFile>");
const stream = await instance.select([
    'STATE', 'foo_struct.STATE_FIPS'
]).stream(4);
for await (const chunk of stream) {
   console.log(chunk);
   /* rows (after passing into parseRecordBatch) look like
    * {"STATE": "ME", "foo_struct": {"STATE_FIPS":"23"}}
    */
}

Tangent

Exposing the with_row_filter part of the builder interface would complete the transformation of the AsyncParquetFile struct into a (very) primitive equivalent to polars' LazyFrame (without the join abilities of course).

It would be fascinating to see in the context of some of the less expensive boolean spatial predicates from geoarrow - provided they can be squeezed into the constraints imposed by ArrowPredicate/Fn (which it looks like they can), that would get you full-blown spatial predicate pushdown for... <<10MB of wasm (more or less instantly paid off by dint of all the saved bandwidth).

@kylebarron
Copy link
Owner Author

  1. At the moment, I'm silently ignoring any input columns that just don't exist - options:
    a. warn, display a succinct repr of the schema and move on, throw if there's absolutely no valid input columns
    b. throw if any of them are invalid (with the schema and a bit of highlighting)

I'd prefer to throw. The usual workflow I'd expect would be fetching the Parquet metadata first, letting the user pick which columns, and then fetching the rest of the data. It's too easy to be off by one character and miss a ton of data.

  1. rich reprs for the schema - flipping the serde feature of arrow_schema seems to be enough.

That seems fine. Ideally the repr won't add much bundle size.

Do you think we could reuse arrow JS's repr? I.e. not make our own and only direct users to inspect the repr from an arrow JS schema object? You should be able to get a schema across FFI by treating it as a struct of fields.

  1. Nested schemas - these work perfectly with dot separators (they're embedded in the ColumDescriptor), though a bit of heads up documentation would probably be warranted (namely that selecting subfields just drops siblings from the parent type, so a table consisting of a struct column foo { a, b, c} after filtering with ['foo.a'] results in foo { a } not a)
    The api looks like this:
let instance = await new AsyncParquetFile("<targetFile>");
const stream = await instance.select([
    'STATE', 'foo_struct.STATE_FIPS'
]).stream(4);
for await (const chunk of stream) {
   console.log(chunk);
   /* rows (after passing into parseRecordBatch) look like
    * {"STATE": "ME", "foo_struct": {"STATE_FIPS":"23"}}
    */
}

That all seems reasonable. It's risky to pull columns up to the top level when the leaf names could collide with something else at the root.

I tend to like dot separators, although I was reminded recently that there's no restriction about not having a dot in the column name, right? Would it be better to have something like {columns: [['foo', 'a'], ['foo', 'b']]}? It's more verbose; not sure.

Tangent

Exposing the with_row_filter part of the builder interface would complete the transformation of the AsyncParquetFile struct into a (very) primitive equivalent to polars' LazyFrame (without the join abilities of course).

It would be fascinating to see in the context of some of the less expensive boolean spatial predicates from geoarrow - provided they can be squeezed into the constraints imposed by ArrowPredicate/Fn (which it looks like they can), that would get you full-blown spatial predicate pushdown for... <<10MB of wasm (more or less instantly paid off by dint of all the saved bandwidth).

IIRC the row filter only happens after the data is materialized in memory, right? In that case I'd tend to think that parquet-wasm should handle row-group filtering but leave the individual row filtering to other tools, or another function (even in the same memory space). I wrote down some ideas on composition of wasm libraries here

@H-Plus-Time
Copy link
Contributor

IIRC the row filter only happens after the data is materialized in memory, right?

Nope, the row filter occurs just before that - the flow is more or less:

  1. Do a fetch of the row group with just the columns required for the predicate (the predicates have to provide an explicit ProjectionMask), and the row selection mask. e.g. just the state column for r.state == 'CA'.
  2. Evaluate the predicate and update the row selection mask.
  3. 1-2 in a loop (with each successive loop taking into account the results of the previous ones - so a predicate that evaluates to false for all rows shortcircuits IO for subsequent predicates)
  4. If there's any rows that passed all the predicates, spit out byte ranges for all data pages touched by the rows * selected columns (the top level projection mask). [this is where the per row group, per column requests problem came from - no attempt is made to coalesce byte ranges here ]
  5. the bulk (hopefully) of the IO + decoding.

A simple hardcoded filter I ran (r.state == 'CA') cut the transfer size and time by 60%.

I agree that specifying the construction of the row filters should be external, but it would have to be provided in some form to the record batch stream builder. The host (by that I mean the code passing in the row filter) would have to be in the same memory space too, likely written in Rust.

Other than geoarrow-wasm[full], geoparquet-wasm makes a lot of sense for a consumer of this extension point - there's several quite high value, low cost (in terms of bundle size) hardcoded IO-integrated filters that anyone using the module would want (or not care about paying for):

  • bounding box intersection (I used this through pyogrio the other day to skip reading ~90% of the Overture Maps dataset)
  • index equality (H3, S2, Quad, Geohash - identical signature really, given they're all uint64s and you're just specifying a column identifier)
  • polygon intersection (harder, and in a lot of cases requires loading the column's contents anyway)

Outside of that, if someone wants more elaborate expressions in the IO phase, they can build their own wrapper module easily enough, if with_filter(RowFilter) is public (but not wasm bindgen'd)

@kylebarron
Copy link
Owner Author

I need to read through that a couple more times to get it, but as a quick note:

  • bounding box intersection (I used this through pyogrio the other day to skip reading ~90% of the Overture Maps dataset)

Yes, I definitely think geoparquet-wasm should have native support for bounding box filtering (ref opengeospatial/geoparquet#191); I think I just wasn't sure whether this filtering should happen at the row group or row level.

@kylebarron
Copy link
Owner Author

  1. Do a fetch of the row group with just the columns required for the predicate (the predicates have to provide an explicit ProjectionMask), and the row selection mask. e.g. just the state column for r.state == 'CA'.
  2. Evaluate the predicate and update the row selection mask.
  3. 1-2 in a loop (with each successive loop taking into account the results of the previous ones - so a predicate that evaluates to false for all rows shortcircuits IO for subsequent predicates)
  4. If there's any rows that passed all the predicates, spit out byte ranges for all data pages touched by the rows * selected columns (the top level projection mask). [this is where the per row group, per column requests problem came from - no attempt is made to coalesce byte ranges here ]
  5. the bulk (hopefully) of the IO + decoding.

Just a note, from my learnings in geoarrow-rs, the RowFilter itself doesn't filter out row groups, it only skips decoding of pages. So you need to both filter specific row groups and then also manage the row filter.

I agree that specifying the construction of the row filters should be external, but it would have to be provided in some form to the record batch stream builder. The host (by that I mean the code passing in the row filter) would have to be in the same memory space too, likely written in Rust.

I think we could reimplement something like the pyarrow filters. E.g. you can pass in a filter to the dataset object.

That would allow serializing a minimal function definition from JS to Wasm, which then could be added on the rust side to a RowFilter.

But still, I think this would be entirely separate from geoparquet-wasm, which would automatically construct all of this from a single bounding box.

@kylebarron
Copy link
Owner Author

We do have a columns option (https://kylebarron.dev/parquet-wasm/types/esm_parquet_wasm.ReaderOptions.html) to specify columns to read, but leaving this open to track defining a row filter

@kylebarron kylebarron changed the title Add reader options to specify column names to read Improve reader options Apr 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants