Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Managed Commits: add a DynamoDB-based commit owner #3107

Merged
merged 1 commit into from
May 20, 2024

Conversation

dhruvarya-db
Copy link
Contributor

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

Taking inspiration from #339, this PR adds a Commit Owner Client which uses DynamoDB as the backend. Each Delta table managed by a DynamoDB instance will have one corresponding entry in a DynamoDB table. The table schema is as follows:

  • tableId: String --- The unique identifier for the entry. This is a UUID.
  • path: String --- The fully qualified path of the table in the file system. e.g. s3://bucket/path.
  • acceptingCommits: Boolean --- Whether the commit owner is accepting new commits. This will only
  • be set to false when the table is converted from managed commits to file system commits.
  • tableVersion: Number --- The version of the latest commit.
  • tableTimestamp: Number --- The inCommitTimestamp of the latest commit.
  • schemaVersion: Number --- The version of the schema used to store the data.
  • commits: --- The list of unbackfilled commits.
    • version: Number --- The version of the commit.
    • inCommitTimestamp: Number --- The inCommitTimestamp of the commit.
    • fsName: String --- The name of the unbackfilled file.
    • fsLength: Number --- The length of the unbackfilled file.
    • fsTimestamp: Number --- The modification time of the unbackfilled file.

For a table to be managed by DynamoDB, registerTable must be called for that Delta table. This will create a new entry in the db for this Delta table. Every commit invocation appends the UUID delta file status to the commits list in the table entry. commit is performed through a conditional write in DynamoDB.

How was this patch tested?

Added a new suite called DynamoDBCommitOwnerClient5BackfillSuite which uses a mock DynamoDB client. + plus manual testing against a DynamoDB instance.

Does this PR introduce any user-facing changes?

assert(tableCommitOwnerClient.getCommits() == GetCommitsResponse(Seq.empty, -1))
var expectedVersion = -1
if (tableCommitOwnerClient.commitOwnerClient.isInstanceOf[DynamoDBCommitOwnerClient]) {
// DynamoDBCommitOwnerClient stores attemptVersion as the current table version when
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DynamoDBCommitOwnerClient.getCommits starts advertising attemptVersion as the current commit version even without knowledge around whether the commit has gone through or not. Practically this won't cause any issue because until the commit is done, no client will invoke DynamoDBCommitOwnerClient.getCommits since the table was a FS table. But just for the sake of semantics, we should fix it in a followup PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, for DynamoDB, we treat registerTable as a valid commit and store the attemptVersion instead of the current Delta table version. This is not risky because the DynamoDB table entry is only treated as valid if the corresponding filesystem commit goes through (which means that the attemptVersion is in fact the Delta table version). In case of a filesystem conflict, it is expected that registerTable will be called again. However, for consistency, we can use another bool to track whether any commits have gone through DynamoDB and return -1.

5 /* readCapacityUnits */,
5 /* writeCapacityUnits */,
false /* skipPathCheck */);
tryEnsureTableExists();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are calling the other constructor here so it will take care of invoking tryEnsureTableExists ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, will fix

return tableConf;
}

// Copied from DynamoDbLogStore. TODO: add the logging back.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What logging needs to be added back?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. That the table exists/does not exist
  2. Table creation attempt
  3. Table creation failure
    I will add these and logs in other parts of this class in a follow up PR.

String dynamoDBEndpoint = conf.get(DYNAMO_DB_ENDPOINT_KEY).getOrElse(() -> {
throw new RuntimeException(DYNAMO_DB_ENDPOINT_KEY + " not found");
});
String awsCredentialsProviderName = conf.get(AWS_CREDENTIALS_PROVIDER_KEY).getOrElse( () ->
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
String awsCredentialsProviderName = conf.get(AWS_CREDENTIALS_PROVIDER_KEY).getOrElse( () ->
String awsCredentialsProviderName = conf.get(AWS_CREDENTIALS_PROVIDER_KEY).getOrElse(() ->

String dynamoDBEndpoint = conf.get(DYNAMO_DB_ENDPOINT_KEY).getOrElse(() -> {
throw new RuntimeException(DYNAMO_DB_ENDPOINT_KEY + " not found");
});
String awsCredentialsProviderName = conf.get(AWS_CREDENTIALS_PROVIDER_KEY).getOrElse( () ->
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should AWS_CREDENTIALS_PROVIDER_KEY come from the commitOwnerConf from DeltaLog? Isn't this more of an authentication config which should be configured at JVM level i.e. via SparkSession.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, once the changes around passing sparkSession to the CommitOwnerBuilder have baked in, I will update this to read it directly from sparkSession. For now, I will hardcode this to the default credentials provider.


private static final String MANAGED_COMMITS_TABLE_NAME_KEY = "managedCommitsTableName";
private static final String DYNAMO_DB_ENDPOINT_KEY = "dynamoDBEndpoint";
private static final String AWS_CREDENTIALS_PROVIDER_KEY = "awsCredentialsProvider";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comments for these properties.

* signature. This method wraps the write method and declares the exception to ensure that the
* caller is aware of the exception.
*/
private void writeActionsToFile(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call this writeActionsToBackfilledFile?

"The requested backfill version " + version + " is greater than the latest " +
"version " + resp.getLatestTableVersion() + " for the table.");
}
// If partial writes a visible in this filesystem, we should not try to overwrite existing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// If partial writes a visible in this filesystem, we should not try to overwrite existing
// If partial writes are visible in this filesystem, we should not try to overwrite existing

@@ -43,7 +43,7 @@ case class Commit(
case class CommitFailedException(
private val retryable: Boolean,
private val conflict: Boolean,
private val message: String) extends Exception(message) {
private val message: String) extends RuntimeException(message) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this changed needed? Isn't that just making it more restrictive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed because the CommitOwnerClient trait does not declare that commit can throw CommitFailedException in the signature (Scala doesn't let us do that). The Java code won't compile if I try to throw an exception of type Exception if it has not been declared in the function signature. RuntimeExceptions can be thrown though.

@vkorukanti vkorukanti merged commit 7b4ee63 into delta-io:master May 20, 2024
9 of 10 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants