-
Notifications
You must be signed in to change notification settings - Fork 115
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix: Take batch_size into account when filling snapshots for INCREMENTAL_BY_UNIQUE_KEY models #2616
Fix: Take batch_size into account when filling snapshots for INCREMENTAL_BY_UNIQUE_KEY models #2616
Conversation
…Y_UNIQUE_KEY models
@@ -96,6 +101,20 @@ def dialect(self) -> str: | |||
def current_catalog_type(self) -> str: | |||
return self.engine_adapter.current_catalog_type | |||
|
|||
@property | |||
def supports_merge(self) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found that this needed to be checked in multiple places so I made it a property of the TestContext
In addition, it's become more complex. It's no longer "engine either supports merge or it doesnt", it depends on both the engine and the catalog
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting. What prompted you to change this? Did you run into any issues? Or did you notice that merge is not being used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed that some other tests, like test_merge()
werent actually testing all the combinations because they were being skipped where dialect=spark
or dialect=trino
.
Spark and Trino do support MERGE, just not on all catalogs, so I improved the check.
But, I wanted to use the same logic in the test_batch_size_on_incremental_by_unique_key_model
test (because unless i'm missing something, INCREMENTAL_BY_UNIQUE_KEY
with batches only works on engines that support MERGE
). So rather than duplicate the checks, I made them a property of the TestContext
fixture so they could be used in both places.
Arguably this kind of thing could also live on the EngineAdapter
@@ -376,6 +395,116 @@ def non_temp_tables(self) -> t.List[str]: | |||
return [x for x in self.tables if not x.startswith("__temp") and not x.startswith("temp")] | |||
|
|||
|
|||
class ProjectCreator(PydanticModel): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had trouble creating a test case to reproduce the issue because it touched so many concepts.
What I really wanted was to be able to set up a small isolated SQLMesh project to expose the issue from the users perspective (sqlmesh plan
throws error when I use a model defined like so), so I wrote this small fixture to create a minimal project I could run against the current engine adapter
I realise there is some overlap with sqlmesh.cli.example_project.init_example_project
but I didnt need the example models and they just slowed things down
schema_name: str | ||
_context: t.Optional[Context] = PrivateAttr(default=None) | ||
|
||
def add_seed(self, model_name: str, columns: t.Dict[str, str], rows: t.List[t.Dict[str, str]]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check out core/dialect.py::pandas_to_sql
or just use create_view in the engine_adapter, that accepts a pandas dataframe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the goal was to produce a project from a user perspective, based on the files in the filesystem that a user would create.
A user doesn't create a seed as a pandas DataFrame (unless theyre using Python models I guess), they create a .csv file in the seeds/
directory and expose it in a SQL model using kind SEED
.
However, i've implemented the pandas_to_sql
version because in this case the seed data is just a vehicle to expose an issue in the INCREMENTAL_BY_UNIQUE_KEY model
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ended up removing this entirely in favour of adding the core methods to TestContext
instead
24b08f2
to
aba4fb5
Compare
…lement batch_index parameter in Airflow scheduler
aba4fb5
to
e40130a
Compare
@izeigerman I had a go at implementing your suggestions on the Airflow scheduler code but I ran out of time to add a test case that proves I implemented them correctly. I'm back on Monday and can resume this up then; otherwise feel free to finish it off if this bug is blocking someone |
sqlmesh/engines/commands.py
Outdated
@@ -37,6 +37,7 @@ class EvaluateCommandPayload(PydanticModel): | |||
end: TimeLike | |||
execution_time: TimeLike | |||
deployability_index: DeployabilityIndex | |||
batch_index: int = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Nit] I don't think we need a default value here, to make sure that all command's attributes are set explicitly upstream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair call, i've removed the default
@@ -20,6 +20,9 @@ | |||
] | |||
) | |||
|
|||
# Dont load Airflow example DAGs because they cause visual pollution | |||
docker_compose["x-airflow-common"]["environment"]["AIRFLOW__CORE__LOAD_EXAMPLES"] = "false" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice 👍 Let's move this into Dockerfile.template
: https://github.com/TobikoData/sqlmesh/blob/main/examples/airflow/Dockerfile.template#L49
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean add an ENV entry to Dockerfile.template
to set it?
The problem is, it gets overridden again the second the containers are instantiated, because its explicitly defined in the upstream docker-compose.yml
file
@@ -1,7 +1,7 @@ | |||
<configuration> | |||
<property> | |||
<name>javax.jdo.option.ConnectionURL</name> | |||
<value>jdbc:postgresql://airflow-postgres-1:5432/metastore_db</value> | |||
<value>jdbc:postgresql://postgres:5432/metastore_db</value> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious: why did you have to change the hostname?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was having trouble with new docker compose
vs legacy docker-compose
(which I think is docker compose V2 vs docker compose V1).
- The name
airflow-postgres-1
is just the container name, autogenerated by docker-compose. Since the service is defined aspostgres
in thedocker-compose.yml
, the network name that should be referenced ispostgres
- Something seems to have changed in the networking that
docker compose
sets up in newer versions. The nameairflow-postgres-1
was not reachable anymore, butpostgres
was. So I couldnt get the tests to run without this change
self.target = target | ||
|
||
|
||
def test_generate_plan_application_dag__batch_index_populated(mocker: MockerFixture, make_snapshot): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't find any existing tests for SnapshotDagGenerator
so I added this file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great, thank you!
This PR makes the
batch_index
available when filling snapshots, soEvaluationStrategy
's can take it into account when making decisions.Prior to this, for
kind
's like INCREMENTAL_BY_UNIQUE_KEY, if:@daily
) and runs them in multiple batches (say,batch_size=1
, which generates aSnapshot
per interval)Snapshot
's, SQLMesh would treat them all as "clear table -> insert data" rather than just the first oneAfter this change, for
Snapshot
's with no existing intervals at plan time, SQLMesh can check the index in the batch and just execute the "clear table" logic for the first snapshot and then the "merge into existing table" logic for subsequent snapshotsFixes #2609