Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Could not load job definition. dagster._check.CheckError: Invariant failed. Description: No metadata found for CacheableAssetsDefinition with unique_id airbyte / Using I/O Manager BigQuery #21939

Open
kamegg13 opened this issue May 17, 2024 · 0 comments
Labels
type: bug Something isn't working

Comments

@kamegg13
Copy link

Dagster version

dagster, version 1.7.6

What's the issue?

Hi there, I am using Dagster with Docker-compose with a gRPC server. I am trying to use Airbyte Assets and manage tables with BigQuery I/O Manager with Dagster, but I am encountering an error in all assets I have in the log materialization:

Could not load job definition.
dagster._check.CheckError: Invariant failed. Description: No metadata found for CacheableAssetsDefinition with unique_id airbyte-88b33e4e12f75ac8bf792aebde41f1a090f3a612_resources_f2c84413fea03353d6642a5c77dd4aa2a40a858b.
  File "/usr/local/lib/python3.10/site-packages/dagster/_grpc/impl.py", line 136, in core_execute_run
    recon_job.get_definition()
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/definitions/reconstruct.py", line 265, in get_definition
    return check.not_none(self.get_repository_definition()).get_maybe_subset_job_def(
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/definitions/reconstruct.py", line 259, in get_repository_definition
    return self.repository.get_definition()
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/definitions/reconstruct.py", line 119, in get_definition
    return repository_def_from_pointer(self.pointer, self.repository_load_data)
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/definitions/reconstruct.py", line 744, in repository_def_from_pointer
    repo_def = repository_def_from_target_def(target, repository_load_data)
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/definitions/reconstruct.py", line 735, in repository_def_from_target_def
    return target.reconstruct_repository_definition(repository_load_data)
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/definitions/repository_definition/repository_definition.py", line 514, in reconstruct_repository_definition
    return self._get_repository_definition(repository_load_data)
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/definitions/repository_definition/repository_definition.py", line 479, in _get_repository_definition
    check.invariant(
  File "/usr/local/lib/python3.10/site-packages/dagster/_check/__init__.py", line 1591, in invariant
    raise CheckError(f"Invariant failed. Description: {desc}")
Capture d’écran 2024-05-17 à 14 49 45

Even when I tried to not give io_manager to the function load_assets_from_airbyte_instance with a io_manager_key=None, I got the same error. It's the same when I try to configure an in-memory IO Manager with InMemoryIOManager(). I tried too to delete iris example assets, I just kept the airbyte assets, but I still have the error.

Only the presence of the BigQuery I/O Manager in resources is enough to get this error.

I think my setup is correct because when I tried to use airbyte and BigQuery I/O Manager separately, everything worked fine.
I have the same error on the older version 1.7.2.

What did you expect to happen?

I expected it would just work and materialize properly my assets.

How to reproduce?

  1. Launch Airbyte Locally on localhost:8000 with default credentials

  2. Create a connection between a source and a destination on Airbyte (you can use Faker)

  3. Use dagster with Docker (If you can) in a multi-container configuration (you can use the docker-dagster example repository) (I think you can have the same error if you launch dagster without dockerà

  4. Try to Materialize an asset (they all give the same error)

Use the code below on dagster 1.7.6 (or 1.7.2) and try to materialize an asset (Airbyte Asset or another one)
asset code:

from dagster import (
    asset,
    EnvVar
)
from dagster_airbyte import AirbyteResource, load_assets_from_airbyte_instance
import pandas as pd

# * ---------------------  Test IO Manager ---------------------
@asset(io_manager_key="bigquery_io_manager")
def iris_data() -> pd.DataFrame:
    return pd.read_csv(
        "https://docs.dagster.io/assets/iris.csv",
        names=[
            "sepal_length_cm",
            "sepal_width_cm",
            "petal_length_cm",
            "petal_width_cm",
            "species",
        ],
    )
@asset(io_manager_key="bigquery_io_manager")
def iris_setosa(iris_data: pd.DataFrame) -> pd.DataFrame:
    return iris_data[iris_data["species"] == "Iris-setosa"]
# * ----------------------------------Airbyte----------------------------------
airbyte_instance = AirbyteResource(
    host=EnvVar('AIRBYTE_HOST').get_value(),
    port=EnvVar('AIRBYTE_PORT').get_value(),
    username=EnvVar('AIRBYTE_USERNAME').get_value(),
    password=EnvVar('AIRBYTE_PASSWORD').get_value(),
)
 # *------- Load Airbyte Assets -----------
airbyte_assets = load_assets_from_airbyte_instance(
    airbyte_instance,
    io_manager_key="memory_io_manager",
    
)

Definitions Code:


from dagster import (
    define_asset_job,
    ScheduleDefinition,
    Definitions,
    EnvVar,
    load_assets_from_modules,
)
from dagster_gcp_pandas import BigQueryPandasIOManager
from assets import *
import assets
# *---------------------Loading Assets---------------------

all_assets = load_assets_from_modules([assets])

# * ---------------------Jobs---------------------
all_assets_job = define_asset_job("run_everything", selection="*")

# *---------------------Schedules---------------------

all_schedule = ScheduleDefinition(
    job=all_assets_job,
    cron_schedule="@daily",
)
# *---------------------Ressources---------------------

io_manager_bigquery = BigQueryPandasIOManager(
                project= EnvVar('GOOGLE_PROJECT_ID').get_value(),
                location= EnvVar('GOOGLE_LOCATION').get_value(),
                dataset=EnvVar('GOOGLE_DATASET_SILVER_ID').get_value(),
                gcp_credentials=EnvVar('GOOGLE_APPLICATION_CREDENTIALS_BASE64').get_value()
)

# *---------------------Définitions---------------------
defs = Definitions(
    # * ----- Define ASSETS -----
    assets=all_assets,
    # * ----- Define JOBS -----
    jobs=[all_assets_job],
    # * ----- Define SCHEDULES -----
    schedules=[all_schedule],
    # * ----- Define GOOGLE BIGQUERY RESOURCES -----
    resources={"bigquery_io_manager": io_manager_bigquery
               }
)

Deployment type

Docker Compose

Deployment details

Docker version 3.7
python:3.10-slim

Using 2 dockerfiles: Docker_user_code and Docker_dagster (
You need to add your airbyte credentials and Google BigQuery Credentials in the env variables in the docker-compose.yml , especially in the docker_daemon and the docker_user_code and Call theses Env Var in the dagster.yaml.

Dagster.yaml

telemetry:
  enabled: false

scheduler:
  module: dagster.core.scheduler
  class: DagsterDaemonScheduler

run_coordinator:
  module: dagster.core.run_coordinator
  class: QueuedRunCoordinator

run_launcher:
  module: dagster_docker
  class: DockerRunLauncher
  config:
    env_vars:
      - DAGSTER_POSTGRES_USER
      - DAGSTER_POSTGRES_PASSWORD
      - DAGSTER_POSTGRES_DB
      - AIRBYTE_HOST
      - AIRBYTE_PORT
      - AIRBYTE_USERNAME
      - AIRBYTE_PASSWORD
      
    network: docker_network
    container_kwargs:
      volumes: # Make docker client accessible to any launched containers as well
        - /var/run/docker.sock:/var/run/docker.sock
        - /tmp/io_manager_storage:/tmp/io_manager_storage

run_storage:
  module: dagster_postgres.run_storage
  class: PostgresRunStorage
  config:
    postgres_db:
      hostname: docker_postgresql
      username:
        env: DAGSTER_POSTGRES_USER
      password:
        env: DAGSTER_POSTGRES_PASSWORD
      db_name:
        env: DAGSTER_POSTGRES_DB
      port: 5432

schedule_storage:
  module: dagster_postgres.schedule_storage
  class: PostgresScheduleStorage
  config:
    postgres_db:
      hostname: docker_postgresql
      username:
        env: DAGSTER_POSTGRES_USER
      password:
        env: DAGSTER_POSTGRES_PASSWORD
      db_name:
        env: DAGSTER_POSTGRES_DB
      port: 5432

# Configuration for the event log storage in Dagster.
event_log_storage:
  module: dagster_postgres.event_log
  class: PostgresEventLogStorage
  config:
    # Configuration for the PostgreSQL database.
    postgres_db:
      hostname: docker_postgresql
      username:
        env: DAGSTER_POSTGRES_USER  # The username for connecting to the PostgreSQL database.
      password:
        env: DAGSTER_POSTGRES_PASSWORD  # The password for connecting to the PostgreSQL database.
      db_name:  # The name of the PostgreSQL database.
        env: DAGSTER_POSTGRES_DB
      port: 5432
  

Dockerfile_dagster

# Dagster libraries to run both dagster-webserver and the dagster-daemon. Does not
# need to have access to any pipeline code.

FROM python:3.10-slim

RUN pip install \
    dagster==1.7.6 \
    dagster-graphql==1.7.6 \
    dagster-webserver==1.7.6 \
    dagster-postgres==0.23.6\
    dagster-docker==0.23.6 

# Set $DAGSTER_HOME and copy dagster instance and workspace YAML there
ENV DAGSTER_HOME=/opt/dagster/dagster_home/

RUN mkdir -p $DAGSTER_HOME

COPY dagster.yaml workspace.yaml $DAGSTER_HOME

WORKDIR $DAGSTER_HOME

Dockerfile_user_code


FROM python:3.10-slim

RUN pip install \
    dagster==1.7.6\
    dagster-postgres \
    dagster-docker==0.23.6 \
    dagster_airbyte==0.23.6 \
    pandas==2.2.2 \
    dagster_gcp==0.23.6 \
    dagster_gcp_pandas==0.23.6 

RUN apt-get update && apt-get install -y curl
#
WORKDIR /opt/dagster/app

COPY main.py /opt/dagster/app
COPY assets.py /opt/dagster/app

EXPOSE 4000
# CMD allows this to be overridden from run launchers or executors that want
# to run other commands against your repository
CMD ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "4000", "-f","main.py"]

workspace.yaml

load_from:
  # Each entry here corresponds to a service in the docker-compose file that exposes user code.
  - grpc_server:
      host: docker_user_code
      port: 4000
      location_name: "pipeline"

docker-compose.yml


version: "3.7"

services:

  docker_postgresql:
    image: postgres:11
    container_name: docker_postgresql
    environment:
      POSTGRES_USER: "postgres_user"
      POSTGRES_PASSWORD: "postgres_password"
      POSTGRES_DB: "postgres_db"
      TEST_VAR: "test_valuepostgres"
    networks:
      - docker_network

  docker_user_code:
    environment:
      DAGSTER_POSTGRES_USER: "postgres_user"
      DAGSTER_POSTGRES_PASSWORD: "postgres_password"
      DAGSTER_POSTGRES_DB: "postgres_db"
      DAGSTER_CURRENT_IMAGE: "docker_user_code_image"

   
      AIRBYTE_HOST: "" # ADD ENV VAR HERE
      AIRBYTE_PORT: "8000"
      AIRBYTE_USERNAME: "" # ADD ENV VAR HERE
      AIRBYTE_PASSWORD: "" # ADD ENV VAR HERE

    build:
      context: .
      dockerfile: ./Dockerfile_user_code
    container_name: docker_user_code
    image: docker_user_code_image
    restart: always
    networks:
      - docker_network

  docker_webserver:
    build:
      context: .
      dockerfile: ./Dockerfile_dagster
    entrypoint:
      - dagster-webserver
      - -h
      - "0.0.0.0"
      - -p
      - "3000"
      - -w
      - workspace.yaml
    container_name: docker_webserver
    expose:
      - "3000"
    ports:
      - "3000:3000"
    environment:
      DAGSTER_POSTGRES_USER: "postgres_user"
      DAGSTER_POSTGRES_PASSWORD: "postgres_password"
      DAGSTER_POSTGRES_DB: "postgres_db"

    volumes: # Make docker client accessible so we can terminate containers from the webserver
      - /var/run/docker.sock:/var/run/docker.sock
      - /tmp/io_manager_storage:/tmp/io_manager_storage
    networks:
      - docker_network
    depends_on:
      - docker_postgresql
      - docker_user_code

  # This service runs the dagster-daemon process, which is responsible for taking runs
  # off of the queue and launching them, as well as creating runs from schedules or sensors.
  docker_daemon:
    build:
      context: .
      dockerfile: ./Dockerfile_dagster
    entrypoint:
      - dagster-daemon
      - run
    container_name: docker_daemon
    restart: on-failure
    environment:
      DAGSTER_POSTGRES_USER: "postgres_user"
      DAGSTER_POSTGRES_PASSWORD: "postgres_password"
      DAGSTER_POSTGRES_DB: "postgres_db"
      AIRBYTE_HOST: "" # ADD ENV VAR HERE 
      AIRBYTE_PORT: "8000"
      AIRBYTE_USERNAME: "" # ADD ENV VAR HERE
      AIRBYTE_PASSWORD:   "" # ADD ENV VAR HERE

    env_file:
      - .env
    volumes: # Make docker client accessible so we can launch containers using host docker
      - /var/run/docker.sock:/var/run/docker.sock
      - /tmp/io_manager_storage:/tmp/io_manager_storage
    networks:
      - docker_network
    depends_on:
      - docker_postgresql
      - docker_user_code

networks:
  docker_network:
    driver: bridge
    name: docker_network



Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

@kamegg13 kamegg13 added the type: bug Something isn't working label May 17, 2024
@kamegg13 kamegg13 changed the title Could not load job definition. dagster._check.CheckError: Invariant failed. Description: No metadata found for CacheableAssetsDefinition with unique_id airbyte Could not load job definition. dagster._check.CheckError: Invariant failed. Description: No metadata found for CacheableAssetsDefinition with unique_id airbyte / Using I/O Manager BigQuery May 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant