Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Take batch_size into account when filling snapshots for INCREMENTAL_BY_UNIQUE_KEY models #2616

Merged

Conversation

erindru
Copy link
Contributor

@erindru erindru commented May 15, 2024

This PR makes the batch_index available when filling snapshots, so EvaluationStrategy's can take it into account when making decisions.

Prior to this, for kind's like INCREMENTAL_BY_UNIQUE_KEY, if:

  • You had a model that defines intervals (say @daily) and runs them in multiple batches (say, batch_size=1, which generates a Snapshot per interval)
  • When running the Snapshot's, SQLMesh would treat them all as "clear table -> insert data" rather than just the first one

After this change, for Snapshot's with no existing intervals at plan time, SQLMesh can check the index in the batch and just execute the "clear table" logic for the first snapshot and then the "merge into existing table" logic for subsequent snapshots

Fixes #2609

@erindru erindru marked this pull request as draft May 15, 2024 22:33
@@ -96,6 +101,20 @@ def dialect(self) -> str:
def current_catalog_type(self) -> str:
return self.engine_adapter.current_catalog_type

@property
def supports_merge(self) -> bool:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that this needed to be checked in multiple places so I made it a property of the TestContext

In addition, it's become more complex. It's no longer "engine either supports merge or it doesnt", it depends on both the engine and the catalog

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. What prompted you to change this? Did you run into any issues? Or did you notice that merge is not being used?

Copy link
Contributor Author

@erindru erindru May 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that some other tests, like test_merge() werent actually testing all the combinations because they were being skipped where dialect=spark or dialect=trino.

Spark and Trino do support MERGE, just not on all catalogs, so I improved the check.

But, I wanted to use the same logic in the test_batch_size_on_incremental_by_unique_key_model test (because unless i'm missing something, INCREMENTAL_BY_UNIQUE_KEY with batches only works on engines that support MERGE). So rather than duplicate the checks, I made them a property of the TestContext fixture so they could be used in both places.

Arguably this kind of thing could also live on the EngineAdapter

@@ -376,6 +395,116 @@ def non_temp_tables(self) -> t.List[str]:
return [x for x in self.tables if not x.startswith("__temp") and not x.startswith("temp")]


class ProjectCreator(PydanticModel):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had trouble creating a test case to reproduce the issue because it touched so many concepts.

What I really wanted was to be able to set up a small isolated SQLMesh project to expose the issue from the users perspective (sqlmesh plan throws error when I use a model defined like so), so I wrote this small fixture to create a minimal project I could run against the current engine adapter

I realise there is some overlap with sqlmesh.cli.example_project.init_example_project but I didnt need the example models and they just slowed things down

schema_name: str
_context: t.Optional[Context] = PrivateAttr(default=None)

def add_seed(self, model_name: str, columns: t.Dict[str, str], rows: t.List[t.Dict[str, str]]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check out core/dialect.py::pandas_to_sql

or just use create_view in the engine_adapter, that accepts a pandas dataframe

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the goal was to produce a project from a user perspective, based on the files in the filesystem that a user would create.

A user doesn't create a seed as a pandas DataFrame (unless theyre using Python models I guess), they create a .csv file in the seeds/ directory and expose it in a SQL model using kind SEED.

However, i've implemented the pandas_to_sql version because in this case the seed data is just a vehicle to expose an issue in the INCREMENTAL_BY_UNIQUE_KEY model

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up removing this entirely in favour of adding the core methods to TestContext instead

@erindru erindru force-pushed the issue-2609-incremental-model-batch-size branch from 24b08f2 to aba4fb5 Compare May 16, 2024 04:54
…lement batch_index parameter in Airflow scheduler
@erindru erindru force-pushed the issue-2609-incremental-model-batch-size branch from aba4fb5 to e40130a Compare May 16, 2024 05:02
@erindru
Copy link
Contributor Author

erindru commented May 16, 2024

@izeigerman I had a go at implementing your suggestions on the Airflow scheduler code but I ran out of time to add a test case that proves I implemented them correctly.

I'm back on Monday and can resume this up then; otherwise feel free to finish it off if this bug is blocking someone

@erindru erindru marked this pull request as ready for review May 16, 2024 12:01
@erindru erindru marked this pull request as draft May 16, 2024 12:02
@@ -37,6 +37,7 @@ class EvaluateCommandPayload(PydanticModel):
end: TimeLike
execution_time: TimeLike
deployability_index: DeployabilityIndex
batch_index: int = 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Nit] I don't think we need a default value here, to make sure that all command's attributes are set explicitly upstream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair call, i've removed the default

@@ -20,6 +20,9 @@
]
)

# Dont load Airflow example DAGs because they cause visual pollution
docker_compose["x-airflow-common"]["environment"]["AIRFLOW__CORE__LOAD_EXAMPLES"] = "false"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean add an ENV entry to Dockerfile.template to set it?

The problem is, it gets overridden again the second the containers are instantiated, because its explicitly defined in the upstream docker-compose.yml file

@@ -1,7 +1,7 @@
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:postgresql://airflow-postgres-1:5432/metastore_db</value>
<value>jdbc:postgresql://postgres:5432/metastore_db</value>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious: why did you have to change the hostname?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was having trouble with new docker compose vs legacy docker-compose (which I think is docker compose V2 vs docker compose V1).

  • The name airflow-postgres-1 is just the container name, autogenerated by docker-compose. Since the service is defined as postgres in the docker-compose.yml, the network name that should be referenced is postgres
  • Something seems to have changed in the networking that docker compose sets up in newer versions. The name airflow-postgres-1 was not reachable anymore, but postgres was. So I couldnt get the tests to run without this change

self.target = target


def test_generate_plan_application_dag__batch_index_populated(mocker: MockerFixture, make_snapshot):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find any existing tests for SnapshotDagGenerator so I added this file

@erindru erindru marked this pull request as ready for review May 20, 2024 05:24
Copy link
Member

@izeigerman izeigerman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great, thank you!

@izeigerman izeigerman merged commit 8ac8a0a into TobikoData:main May 20, 2024
12 checks passed
@erindru erindru deleted the issue-2609-incremental-model-batch-size branch May 20, 2024 19:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

INCREMENTAL_BY_UNIQUE_KEY models not taking batch_size into account
3 participants