Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark Action to Analyze table #10288

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

karuppayya
Copy link
Contributor

This change adds a Spark action to Analyze tables.
As part of analysis, the action generates Apache data - sketch for NDV stats and writes it as puffins.

@karuppayya
Copy link
Contributor Author

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Computes the statistic of the given columns and stores it as Puffin files. */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AnalyzeTableSparkAction is a generic name, I see that in future we want to compute the partition stats too. Which may not be written as puffin files.

Either we can change the change the naming to computeNDVSketches or make it generic such that any kind of stats can be computed from this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking more on this, I think we should just call it computeNDVSketches and not mix it with partition stats.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to follow the model of RDMS and Engines like Trino using ANALYZE TABLE <tblName> to collect all table level stats.
With a procedure per stats model, the user have to invoke procedure/action for every stats and
also with any new stats addition, the user need to ensure to update his code to call the new procedure/action.

not mix it with partition stats.

I think we could have partition stats as a separate action since it per partition, whereas this procedure can collect top level table stats.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@karuppayya
I can see the tests in TestAnalyzeTableAction, it's working fine.
But have we tested in Spark, whether its working with a query like -
"Analyze table table1 compute statistics" ?

Because generally it gives the error
"[NOT_SUPPORTED_COMMAND_FOR_V2_TABLE] ANALYZE TABLE is not supported for v2 tables."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark doesnot have the grammar for Analyzing tables.
This PR introduces a Spark action. In subsequent PR, i plan to introduce a iceberg procedure to invoke the Spark action.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@rice668 rice668 May 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that in future we want to compute the partition stats too. Which may not be written as puffin files.

Hi, @ajantha-bhat I agree with you, otherwise, the queries would have a lot of limitations, such as being applicable only for calculating the NDV over the entire table.

For example, Trino might want to read the NDV values written by Spark to respond to queries. However, if the query has partition filter conditions, then Trino would not be able to use the pre-computed NDV information from Spark. So, what do you think ?

Copy link

@jeesou jeesou May 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @karuppayya , as the above discussions suggest, there can be multiple engines like spark, presto, trino etc, who might want to query of the same data right. So in such a scenario the sketches that are generated by Spark or suppose Presto, must be readable by the alternate engine right.

This question is coming because I ran one Analyze query on Presto and the puffin file it created looks like this ->

{"blobs":[{"type":"apache-datasketches-theta-v1","fields":[2],"snapshot-id":7724902347602477706,
"sequence-number":1,"offset":44,"length":40,"properties":{"ndv":"3"}}],"properties":{"created-by":"presto-testversion"}}

where as the one created by iceberg through the changes of this PR looks like this ->

{"blobs":[{"type":"apache-datasketches-theta-v1","fields":[3],
"snapshot-id":5334747061548805461,"sequence-number":1,"offset":4,"length":32}],"properties"

If seen properly the "{"ndv":"3"}" portion is missing in the iceberg change.

So can we make any modifications or any suggestions from your side may be?
Because as per my understanding the Sketch file should be universal to all engines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeesou
Yes, agreed that the sketch needs to compatible across all engines.
This PR takes care of using the same library(Apache dataasketches) as Trino does. (This was the major concern here)
Do we need to add the property ndv , should nt engines be reading the value from the sketch?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm this discussion makes me wonder if we're under spec'd in this regard. According to the spec:

https://iceberg.apache.org/puffin-spec/#blob-types

The blob metadata for this blob may include following properties:
    ndv: estimate of number of distinct values, derived from the sketch.

It really seems like we should take a stance. Either it must be in the sketch or it must be in the properties. "may include" seems a little too loose.

spark(), table, columnsToBeAnalyzed.toArray(new String[0]));
table
.updateStatistics()
.setStatistics(table.currentSnapshot().snapshotId(), statisticsFile)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if table's current snapshot has modified concurrently by another client between like 117 to line 120?


public static Iterator<Tuple2<String, ThetaSketchJavaSerializable>> computeNDVSketches(
SparkSession spark, String tableName, String... columns) {
String sql = String.format("select %s from %s", String.join(",", columns), tableName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also think about incremental update and update sketches from previous checkpoint. Querying whole table maybe not efficient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, incremental need to be wired into the ends of write paths.
This procedure could exist in parallel, which could get stats of the whole table on demand.

assumeTrue(catalogName.equals("spark_catalog"));
sql(
"CREATE TABLE %s (id int, data string) USING iceberg TBLPROPERTIES"
+ "('format-version'='2')",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default format version itself v2 now. So, specifying it again is redundant.

String path = operations.metadataFileLocation(String.format("%s.stats", UUID.randomUUID()));
OutputFile outputFile = fileIO.newOutputFile(path);
try (PuffinWriter writer =
Puffin.write(outputFile).createdBy("Spark DistinctCountProcedure").build()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this name instead of "analyze table procedure".

@ajantha-bhat
Copy link
Member

there was an old PR on the same: #6582

@huaxingao
Copy link
Contributor

there was an old PR on the same: #6582

I don't have time to work on this, so karuppayya will take over. Thanks a lot @karuppayya for continuing the work.

Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @karuppayya @huaxingao @szehon-ho this is aewsome to see! I left a review of the API/implementation, still have yet to review the tests which look to be a WIP

* @param statsToBeCollected set of statistics to be collected
* @return this for method chaining
*/
AnalyzeTable stats(Set<String> statsToBeCollected);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these stats be a Set<StandardBlobType> instead of arbitrary Strings? I feel like the API becomes more well defined in this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, StandardBlobType defines string constants not enums

Comment on lines 89 to 98
private void validateColumns() {
validateEmptyColumns();
validateTypes();
}

private void validateEmptyColumns() {
if (columnsToBeAnalyzed == null || columnsToBeAnalyzed.isEmpty()) {
throw new ValidationException("No columns to analyze for the table", table.name());
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think this validation should just happen at the time of setting these on the action rather than at the execcution time.

* @return this for method chaining
*/
AnalyzeTable stats(Set<String> statsToBeCollected);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think this interface should have a snapshot API to allow users to pass in a snapshot to generate the statistics for. If it's not specified then we can generate the statistics for the latest snapshot.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we support branch/tag as well? (I guess in subsequent pr)

Comment on lines 104 to 106
if (field == null) {
throw new ValidationException("No column with %s name in the table", columnName);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nit: new line after the if

SparkSession spark, Table table, long snapshotId, String... columnsToBeAnalyzed)
throws IOException {
Iterator<Tuple2<String, ThetaSketchJavaSerializable>> tuple2Iterator =
NDVSketchGenerator.computeNDVSketches(spark, table.name(), snapshotId, columnsToBeAnalyzed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does computeDVSketches need to be public? Seems like it can just be package private. Also nit, either way don't think you need the full qualified method name

import org.apache.datasketches.theta.Sketches;
import org.apache.datasketches.theta.UpdateSketch;

public class ThetaSketchJavaSerializable implements Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be public?

Comment on lines +46 to +51
if (sketch == null) {
return null;
}
if (sketch instanceof UpdateSketch) {
return sketch.compact();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nit: new line after if

null,
ImmutableMap.of()));
}
writer.finish();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Don't think you need the writer.finish() because the try with resources will close, and close will already finish

table.currentSnapshot().snapshotId(),
table.currentSnapshot().sequenceNumber(),
ByteBuffer.wrap(sketchMap.get(columns.get(i)).getSketch().toByteArray()),
null,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null means that the file will be uncompressed. I think it makes sense not to compress these files by default since the sketch will be a single long per column, so it'll be quite small already and not worth paying the price of compression/decompression.

Comment on lines +157 to +168
if (sketch1.getSketch() == null && sketch2.getSketch() == null) {
return emptySketchWrapped;
}
if (sketch1.getSketch() == null) {
return sketch2;
}
if (sketch2.getSketch() == null) {
return sketch1;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nit: new line after if

@karuppayya karuppayya force-pushed the analyze_action branch 3 times, most recently from 5538f6e to de520fc Compare June 4, 2024 17:55
Copy link
Collaborator

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @karuppayya thanks for the patch, I left a first round of comments.

* @param columns a set of column names to be analyzed
* @return this for method chaining
*/
AnalyzeTable columns(Set<String> columns);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, how about String... columns (see RewriteDataFiles). same for the others

* @param statsToBeCollected set of statistics to be collected
* @return this for method chaining
*/
AnalyzeTable stats(Set<String> statsToBeCollected);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's call statistics? Like StatisticsFile. https://iceberg.apache.org/contribute/#java-style-guidelines I think it can interpreted differently but I think point 3 implies we should make it have the full spelling if possible, and we dont have abbreviations for API methods in most of code.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also statsToBeCollected => types ?

AnalyzeTable columns(Set<String> columns);

/**
* A set of statistics to be collected on the given columns of the given table
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The set of statistics to be collected? (given columns, given tables is specified elsewhere)

*/
AnalyzeTable snapshot(String snapshotId);

/** The action result that contains a summary of the Analysis. */
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plural? contains summaries of the analysis?

Also if capital, it can be a a javadoc link.

* @return this for method chaining
*/
AnalyzeTable stats(Set<String> statsToBeCollected);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we support branch/tag as well? (I guess in subsequent pr)

(PairFlatMapFunction<Iterator<Row>, String, String>)
input -> {
final List<Tuple2<String, String>> list = Lists.newArrayList();
while (input.hasNext()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use flatmap and mapToPair to make this more concise?

data.javaRDD().flatMap(r -> {
          List<Tuple2<String, String>> list =
            Lists.newArrayListWithExpectedSize(columns.size());
          for (int i = 0; i < r.size(); i++) {
            list.add(new Tuple2<>(columns.get(i), r.get(i).toString());
          }
          return list.iterator();
          }).mapToPair(t -> t);

return ImmutableAnalyzeTable.Result.builder().analysisResults(analysisResults).build();
}

private boolean analyzeableTypes(Set<String> columns) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to intellij, there is a typo (analyzable)

final JavaPairRDD<String, ThetaSketchJavaSerializable> sketches =
pairs.aggregateByKey(
new ThetaSketchJavaSerializable(),
1, // number of partitions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why limit to 1 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code was just copied from datasketches example.
This value is used in the HashPartitioner behind the scenes.
Should we set it to spark.sql.shuffle.partitions?

return sketches.toLocalIterator();
}

static class Add
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use lambdas here for cleaner code? like

 (sketch, val) -> {
              sketch.update(val;);
              return sketch;
          },

The next one may be too complex to inline but maybe we can reduce the ugly java boilerplate

final Row row = input.next();
int size = row.size();
for (int i = 0; i < size; i++) {
list.add(new Tuple2<>(columns.get(i), row.get(i).toString()));
Copy link
Collaborator

@szehon-ho szehon-ho Jun 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question, does forcing string type affect anything? I see the sketch library takes in other types.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants