Dagster & Fivetran (Pythonic)
If you are just getting started with the Fivetran integration, we recommend using the new Fivetran component.
This guide provides instructions for using Dagster with Fivetran using the dagster-fivetran library. Your Fivetran connector tables can be represented as assets in the Dagster asset graph, allowing you to track lineage and dependencies between Fivetran assets and data assets you are already modeling in Dagster.
The Fivetran integration offers two capabilities:
- Observability - You can view your Fivetran assets in the Dagster Asset Graph and track sync completions. A polling sensor detects externally-triggered Fivetran syncs and emits materialization events.
- Orchestration - You can use Dagster to trigger Fivetran syncs, either on a cron schedule or based on upstream dependencies. When Fivetran reschedules a sync due to quota limits, Dagster automatically handles the retry.
Your Fivetran connectors must have been synced at least once to be represented in Dagster.
What you'll learn
- How to represent Fivetran assets in the Dagster asset graph, including lineage to other Dagster assets.
- How to observe externally-triggered Fivetran syncs using a polling sensor.
- How to materialize Fivetran connector tables from Dagster.
- How to handle Fivetran quota-based rescheduling.
- How to customize asset definition metadata for these Fivetran assets.
Prerequisites
- The
dagsteranddagster-fivetranlibraries installed in your environment - Familiarity with asset definitions and the Dagster asset graph
- Familiarity with Dagster resources
- Familiarity with Fivetran concepts, like connectors and connector tables
- A Fivetran workspace
- A Fivetran API key and API secret. For more information, see Getting Started in the Fivetran REST API documentation.
Set up your environment
To get started, you'll need to install the dagster and dagster-fivetran Python packages:
- uv
- pip
uv add dagster-fivetran
pip install dagster-fivetran
Represent Fivetran assets in the asset graph
To load Fivetran assets into the Dagster asset graph, you must first construct a FivetranWorkspace resource, which allows Dagster to communicate with your Fivetran workspace. You'll need to supply your account ID, API key and API secret. See Getting Started in the Fivetran REST API documentation for more information on how to create your API key and API secret.
Dagster can automatically load all connector tables from your Fivetran workspace as asset specs. Call the load_fivetran_asset_specs function, which returns list of AssetSpecs representing your Fivetran assets. You can then include these asset specs in your Definitions object:
from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs
import dagster as dg
fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)
fivetran_specs = load_fivetran_asset_specs(fivetran_workspace)
defs = dg.Definitions(assets=fivetran_specs, resources={"fivetran": fivetran_workspace})
Observability
If your Fivetran connectors run on Fivetran's own scheduler, you can use a polling sensor to detect completed syncs and emit AssetMaterialization events into Dagster's event log. This allows you to view sync history and track freshness in the Dagster UI without Dagster triggering the syncs.
To set this up:
- Load your Fivetran assets as asset specs.
- Build a polling sensor using
build_fivetran_polling_sensor. - Include both in your
Definitions.
from dagster_fivetran import (
FivetranWorkspace,
build_fivetran_polling_sensor,
load_fivetran_asset_specs,
)
import dagster as dg
fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)
# Load all connector tables as asset specs
fivetran_specs = load_fivetran_asset_specs(fivetran_workspace)
# Build a sensor that polls Fivetran for completed syncs and emits
# AssetMaterialization events into the Dagster event log
fivetran_polling_sensor = build_fivetran_polling_sensor(workspace=fivetran_workspace)
defs = dg.Definitions(
assets=fivetran_specs,
sensors=[fivetran_polling_sensor],
resources={"fivetran": fivetran_workspace},
)
The sensor polls Fivetran for connector status updates on each tick. When it detects a completed sync, it emits AssetMaterialization events for the synced tables. If a connector is rescheduled due to quota limits, the sensor logs a warning and retries on the next tick.
If you use the polling sensor alongside Dagster-triggered syncs, the sensor will not emit duplicate events for syncs that Dagster already materialized. You can safely enable both the polling sensor and Dagster orchestration together.
By default, Dagster disables Fivetran's auto-schedule when it triggers a sync. To keep Fivetran's schedule active, set disable_schedule_on_trigger=False:
fivetran_workspace = FivetranWorkspace(
account_id=EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=EnvVar("FIVETRAN_API_KEY"),
api_secret=EnvVar("FIVETRAN_API_SECRET"),
disable_schedule_on_trigger=False,
)
Orchestration
If you want Dagster to trigger Fivetran syncs — either on a schedule or based on upstream dependencies — you can build materializable asset definitions and use Declarative Automation to manage them.
To set this up:
- Build materializable asset definitions using
build_fivetran_assets_definitions. - Apply automation conditions to trigger syncs based on upstream changes.
- Build a polling sensor to track sync completions.
from dagster_fivetran import (
FivetranWorkspace,
build_fivetran_assets_definitions,
build_fivetran_polling_sensor,
)
import dagster as dg
fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)
# Build materializable asset definitions for all connectors
all_fivetran_assets = build_fivetran_assets_definitions(workspace=fivetran_workspace)
# Automate materialization with Declarative Automation
all_fivetran_assets = [
assets_def.map_asset_specs(
lambda spec: spec.replace_attributes(
automation_condition=dg.AutomationCondition.eager()
)
)
for assets_def in all_fivetran_assets
]
# Declarative Automation sensor triggers syncs based on upstream dependencies
automation_sensor = dg.AutomationConditionSensorDefinition(
name="automation_sensor",
target="*",
default_status=dg.DefaultSensorStatus.RUNNING,
minimum_interval_seconds=1,
)
# Polling sensor detects externally completed syncs
fivetran_polling_sensor = build_fivetran_polling_sensor(workspace=fivetran_workspace)
defs = dg.Definitions(
assets=all_fivetran_assets,
sensors=[automation_sensor, fivetran_polling_sensor],
resources={"fivetran": fivetran_workspace},
)
Sync and materialize Fivetran assets
You can use Dagster to sync Fivetran connectors and materialize Fivetran connector tables. You can use the build_fivetran_assets_definitions factory to create all assets definitions for your Fivetran workspace.
When syncing a Fivetran connector via Dagster, all Fivetran assets for this connector are materialized in Dagster.
from dagster_fivetran import FivetranWorkspace, build_fivetran_assets_definitions
import dagster as dg
fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)
all_fivetran_assets = build_fivetran_assets_definitions(workspace=fivetran_workspace)
defs = dg.Definitions(
assets=all_fivetran_assets,
resources={"fivetran": fivetran_workspace},
)
Customize the materialization of Fivetran assets
If you want to customize the sync of your connectors, you can use the fivetran_assets decorator to do so. This allows you to execute custom code before and after the call to the Fivetran sync.
from dagster_fivetran import FivetranWorkspace, fivetran_assets
import dagster as dg
fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)
@fivetran_assets(
connector_id="fivetran_connector_id", # Replace with your connector ID
name="fivetran_connector_name", # Replace with your connection name
group_name="fivetran_connector_name",
workspace=fivetran_workspace,
)
def fivetran_connector_assets(
context: dg.AssetExecutionContext, fivetran: FivetranWorkspace
):
# Do something before the materialization...
yield from fivetran.sync_and_poll(context=context)
# Do something after the materialization...
defs = dg.Definitions(
assets=[fivetran_connector_assets],
resources={"fivetran": fivetran_workspace},
)
Perform historical resyncs of Fivetran assets
In addition to incremental syncs, you can perform full historical resyncs of your Fivetran connector tables using the resync_and_poll() method. This is useful when you need to backfill historical data or reload data after schema changes.
You can resync specific tables by providing resync_parameters, or resync all tables in a connector by omitting this parameter:
from dagster_fivetran import FivetranSyncConfig, FivetranWorkspace, fivetran_assets
import dagster as dg
fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)
@fivetran_assets(
connector_id="fivetran_connector_id", # Replace with your connector ID
name="fivetran_connector_name", # Replace with your connection name
group_name="fivetran_connector_name",
workspace=fivetran_workspace,
)
def fivetran_connector_assets(
context: dg.AssetExecutionContext,
fivetran: FivetranWorkspace,
config: FivetranSyncConfig,
):
"""Syncs Fivetran connector with optional resync capability.
Configure at runtime:
- For normal sync: Pass config with resync=False (default)
- For historical resync of specific tables: Pass config with resync=True and resync_parameters
- For full historical resync: Pass config with resync=True and no resync_parameters
"""
yield from fivetran.sync_and_poll(context=context, config=config)
# start_resync_all
@fivetran_assets(
connector_id="fivetran_connector_id",
name="fivetran_connector_name_full_resync",
group_name="fivetran_connector_name",
workspace=fivetran_workspace,
)
def fivetran_connector_full_resync_assets(
context: dg.AssetExecutionContext,
fivetran: FivetranWorkspace,
config: FivetranSyncConfig,
):
"""Performs a full historical resync of all tables in the connector.
Configure at runtime with resync=True.
"""
yield from fivetran.sync_and_poll(context=context, config=config)
# end_resync_all
defs = dg.Definitions(
assets=[fivetran_connector_assets, fivetran_connector_full_resync_assets],
resources={"fivetran": fivetran_workspace},
)
To resync all tables in a connector, simply call resync_and_poll() without the resync_parameters argument:
@fivetran_assets(
connector_id="fivetran_connector_id",
name="fivetran_connector_name_full_resync",
group_name="fivetran_connector_name",
workspace=fivetran_workspace,
)
def fivetran_connector_full_resync_assets(
context: dg.AssetExecutionContext,
fivetran: FivetranWorkspace,
config: FivetranSyncConfig,
):
"""Performs a full historical resync of all tables in the connector.
Configure at runtime with resync=True.
"""
yield from fivetran.sync_and_poll(context=context, config=config)
Historical resyncs can be time-consuming and resource-intensive operations. Be mindful of the cost implications when resyncing large datasets.
Handling Fivetran quota-based rescheduling
Fivetran may reschedule a connector sync when your account hits API quota limits. By default, when Dagster detects a rescheduled sync during polling, it raises a RetryRequested exception that retries the Dagster step after the rescheduled time passes. This ensures the run eventually completes.
If you prefer Dagster to continue polling in the same step rather than raising a retry, set retry_on_reschedule=False on the FivetranWorkspace resource:
# When Fivetran reschedules a sync due to quota limits, the default behavior
# is to raise RetryRequested, which will retry the Dagster step after the
# rescheduled time. Set retry_on_reschedule=False to instead continue polling
# until the rescheduled sync completes.
fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
retry_on_reschedule=False,
)
Keeping Fivetran's schedule active alongside Dagster
By default, the first time Dagster triggers a Fivetran sync, it sets the connector's schedule to "manual", which disables Fivetran's auto-scheduling. This ensures Dagster is the sole orchestrator.
If you want to keep Fivetran's own schedule active while also triggering syncs from Dagster, set disable_schedule_on_trigger=False on the FivetranWorkspace resource:
fivetran_workspace = FivetranWorkspace(
account_id=EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=EnvVar("FIVETRAN_API_KEY"),
api_secret=EnvVar("FIVETRAN_API_SECRET"),
disable_schedule_on_trigger=False,
)
When using this mode with the polling sensor enabled, the sensor automatically deduplicates materialization events to avoid double-counting syncs that were triggered by Dagster.
Customize asset definition metadata for Fivetran assets
By default, Dagster will generate asset specs for each Fivetran asset and populate default metadata. You can further customize asset properties by passing an instance of the custom DagsterFivetranTranslator to the load_fivetran_asset_specs function.
from dagster_fivetran import (
DagsterFivetranTranslator,
FivetranConnectorTableProps,
FivetranWorkspace,
load_fivetran_asset_specs,
)
import dagster as dg
fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)
# A translator class lets us customize properties of the built
# Fivetran assets, such as the owners or asset key
class MyCustomFivetranTranslator(DagsterFivetranTranslator):
def get_asset_spec(self, props: FivetranConnectorTableProps) -> dg.AssetSpec:
# We create the default asset spec using super()
default_spec = super().get_asset_spec(props)
# We customize the metadata and asset key prefix for all assets
return default_spec.replace_attributes(
key=default_spec.key.with_prefix("prefix"),
).merge_attributes(metadata={"custom": "metadata"})
fivetran_specs = load_fivetran_asset_specs(
fivetran_workspace, dagster_fivetran_translator=MyCustomFivetranTranslator()
)
defs = dg.Definitions(assets=fivetran_specs, resources={"fivetran": fivetran_workspace})
Note that super() is called in each of the overridden methods to generate the default asset spec. It is best practice to generate the default asset spec before customizing it.
You can pass an instance of the custom DagsterFivetranTranslator to the fivetran_assets decorator or the build_fivetran_assets_definitions factory.
Fetching column-level metadata for Fivetran assets
Dagster allows you to emit column-level metadata, like column schema and column lineage, as materialization metadata.
With this metadata, you can view documentation in Dagster for all columns in your Fivetran connector tables.
To enable this feature, call fetch_column_metadata() on the fivetran_event_iterator.FivetranEventIterator returned by the sync_and_poll() call on the FivetranWorkspace resource.
from dagster_fivetran import FivetranWorkspace, fivetran_assets
import dagster as dg
fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)
@fivetran_assets(
# Replace with your connector ID
connector_id="fivetran_connector_id",
workspace=fivetran_workspace,
)
def fivetran_connector_assets(
context: dg.AssetExecutionContext, fivetran: FivetranWorkspace
):
yield from fivetran.sync_and_poll(context=context).fetch_column_metadata()
defs = dg.Definitions(
assets=[fivetran_connector_assets],
resources={"fivetran": fivetran_workspace},
)
Load Fivetran assets for selected connectors
To select a subset of Fivetran connectors for which your Fivetran assets will be loaded, you can use the ConnectorSelectorFn callback and define your selection conditions.
from dagster_fivetran import FivetranWorkspace, build_fivetran_assets_definitions
import dagster as dg
fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)
all_fivetran_assets = build_fivetran_assets_definitions(
workspace=fivetran_workspace,
connector_selector_fn=(
lambda connector: connector.id in {"some_connector_id", "another_connector_id"}
),
)
defs = dg.Definitions(
assets=all_fivetran_assets,
resources={"fivetran": fivetran_workspace},
)
Load Fivetran assets using a snapshot
Fivetran assets can be loaded using the snapshot of a Fivetran workspace, which allows organizations with large amounts of Fivetran data to speed up their deployment process.
from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs
import dagster as dg
fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
snapshot_path=dg.EnvVar("FIVETRAN_SNAPSHOT_PATH"),
)
fivetran_specs = load_fivetran_asset_specs(workspace=fivetran_workspace)
defs = dg.Definitions(assets=fivetran_specs)
To capture the snapshot, the dagster-fivetran snapshot CLI can be used.
dagster-fivetran snapshot --python-module my_dagster_package --output-path snapshot.snap
Creating Fivetran jobs and schedules
Once you have your Fivetran assets, you can define a job to materialize all of them.
from dagster_fivetran import FivetranWorkspace, build_fivetran_assets_definitions
import dagster as dg
fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)
all_fivetran_assets = build_fivetran_assets_definitions(workspace=fivetran_workspace)
all_fivetran_assets_job = dg.define_asset_job(
name="all_fivetran_assets_job",
selection=all_fivetran_assets,
)
defs = dg.Definitions(
assets=all_fivetran_assets,
jobs=[all_fivetran_assets_job],
resources={"fivetran": fivetran_workspace},
)
You can also define a job for a selection of these assets, or for the assets of a single connector.
from dagster_fivetran import FivetranWorkspace, fivetran_assets
import dagster as dg
fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)
@fivetran_assets(
connector_id="fivetran_connector_id", # Replace with your connector ID
name="fivetran_connector_name", # Replace with your connection name
workspace=fivetran_workspace,
)
def fivetran_connector_assets(
context: dg.AssetExecutionContext, fivetran: FivetranWorkspace
):
yield from fivetran.sync_and_poll(context=context)
fivetran_connector_assets_job = dg.define_asset_job(
name="fivetran_connector_assets_job",
selection=[fivetran_connector_assets],
)
defs = dg.Definitions(
assets=[fivetran_connector_assets],
jobs=[fivetran_connector_assets_job],
resources={"fivetran": fivetran_workspace},
)
Finally, jobs created for your Fivetran assets can be scheduled.
fivetran_connector_assets_schedule = dg.ScheduleDefinition(
job=fivetran_connector_assets_job,
cron_schedule="0 0 * * *", # Runs at midnight daily
)
defs = dg.Definitions(
assets=[fivetran_connector_assets],
jobs=[fivetran_connector_assets_job],
schedules=[fivetran_connector_assets_schedule],
resources={"fivetran": fivetran_workspace},
)
Load Fivetran assets from multiple workspaces
Definitions from multiple Fivetran workspaces can be combined by instantiating multiple FivetranWorkspace resources and merging their specs. This lets you view all your Fivetran assets in a single asset graph:
from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs
import dagster as dg
sales_fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_SALES_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_SALES_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_SALES_API_SECRET"),
)
marketing_fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_MARKETING_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_MARKETING_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_MARKETING_API_SECRET"),
)
sales_fivetran_specs = load_fivetran_asset_specs(sales_fivetran_workspace)
marketing_fivetran_specs = load_fivetran_asset_specs(marketing_fivetran_workspace)
# Merge the specs into a single set of definitions
defs = dg.Definitions(
assets=[*sales_fivetran_specs, *marketing_fivetran_specs],
resources={
"marketing_fivetran": marketing_fivetran_workspace,
"sales_fivetran": sales_fivetran_workspace,
},
)
Define upstream dependencies
By default, Dagster does not set upstream dependencies when generating asset specs for your Fivetran assets. You can set upstream dependencies on your Fivetran assets by passing an instance of the custom DagsterFivetranTranslator to the load_fivetran_asset_specs function.
class MyCustomFivetranTranslator(DagsterFivetranTranslator):
def get_asset_spec(self, props: FivetranConnectorTableProps) -> dg.AssetSpec:
# We create the default asset spec using super()
default_spec = super().get_asset_spec(props)
# We set an upstream dependency for our assets
return default_spec.replace_attributes(deps=["my_upstream_asset_key"])
fivetran_specs = load_fivetran_asset_specs(
fivetran_workspace, dagster_fivetran_translator=MyCustomFivetranTranslator()
)
Note that super() is called in each of the overridden methods to generate the default asset spec. It is best practice to generate the default asset spec before customizing it.
You can pass an instance of the custom DagsterFivetranTranslator to the fivetran_assets decorator or the build_fivetran_assets_definitions factory.
Define downstream dependencies
Dagster allows you to define assets that are downstream of specific Fivetran tables using their asset keys. The asset key for a Fivetran table can be retrieved using the asset definitions created using the fivetran_assets decorator. The below example defines my_downstream_asset as a downstream dependency of my_fivetran_table:
@fivetran_assets(
# Replace with your connector ID
connector_id="fivetran_connector_id",
workspace=fivetran_workspace,
)
def fivetran_connector_assets(
context: dg.AssetExecutionContext, fivetran: FivetranWorkspace
): ...
my_fivetran_table_asset_key = next(
iter(
[
spec.key
for spec in fivetran_connector_assets.specs
if spec.metadata.get("dagster/table_name")
== "my_database.my_schema.my_fivetran_table"
]
)
)
@dg.asset(deps=[my_fivetran_table_asset_key])
def my_downstream_asset(): ...
In the downstream asset, you may want direct access to the contents of the Fivetran table. To do so, you can customize the code within your @asset-decorated function to load upstream data.
About Fivetran
Fivetran ingests data from SaaS applications, databases, and servers. The data is stored and typically used for analytics.