Skip to main content

Build pipelines with GCP Dataproc

This article covers how to use Dagster Pipes to submit jobs to GCP Dataproc.

The dagster-gcp integration library provides the pipes.PipesDataprocJobClient resource, which can be used to launch GCP Dataproc jobs from Dagster assets and ops. Dagster can receive events such as logs, asset checks, or asset materializations from jobs launched with this client. The client requires minimal code changes to your Dataproc jobs.

Prerequisites
  • In the Dagster environment, you'll need to:

    • Install the following packages:

      pip install dagster dagster-webserver 'dagster-gcp[dataproc]'

      Refer to the Dagster installation guide for more info.

    • Configure GCP authentication for applications. If you don't have this set up already, refer to the GCP authentication guide.

  • In GCP, you'll need:

    • An existing project with a Dataproc cluster.
    • Prepared infrastructure such as GCS buckets, IAM roles, and other resources required for your Dataproc job.

Step 1: Install the dagster-pipes module in your Dataproc environment

Choose one of the options to install dagster-pipes in the Dataproc environment.

For example, use the following property in your configuration:

dataproc:pip.packages: "dagster-pipes,google-cloud-storage"

google-cloud-storage is an optional dependency required for passing Pipes messages from the Dataproc job to Dagster.

Step 2: Add dagster-pipes to the Dataproc job script

Call open_dagster_pipes in the Dataproc script to create a context that can be used to send messages to Dagster:

from dagster_pipes import (
PipesCliArgsParamsLoader,
PipesGCSContextLoader,
PipesGCSMessageWriter,
open_dagster_pipes,
)
from google.cloud.storage import Client as GCSClient
from pyspark.sql import SparkSession


def main():
gcs_client = GCSClient()
with open_dagster_pipes(
context_loader=PipesGCSContextLoader(client=gcs_client),
message_writer=PipesGCSMessageWriter(client=gcs_client),
params_loader=PipesCliArgsParamsLoader(),
) as pipes:
pipes.log.info("Hello from GCP Dataproc!")

print("Hello from GCP Dataproc Spark driver stdout!")

spark = SparkSession.builder.appName("HelloWorld").getOrCreate()

df = spark.createDataFrame(
[(1, "Alice", 34), (2, "Bob", 45), (3, "Charlie", 56)],
["id", "name", "age"],
)

# calculate a really important statistic
avg_age = float(df.agg({"age": "avg"}).collect()[0][0])

# attach it to the asset materialization in Dagster
pipes.report_asset_materialization(
metadata={"average_age": {"raw_value": avg_age, "type": "float"}},
data_version="alpha",
)

spark.stop()

tip

The metadata format shown above ({"raw_value": value, "type": type}) is part of Dagster Pipes' special syntax for specifying rich Dagster metadata. For a complete reference of all supported metadata types and their formats, see the Dagster Pipes metadata reference.

Step 3: Create an asset using the PipesDataprocJobClient to launch the job

In the Dagster asset/op code, use the PipesDataprocJobClient resource to launch the job:

import os

import boto3
from dagster_gcp.pipes import (
PipesDataprocJobClient,
PipesGCSContextInjector,
PipesGCSMessageReader,
)
from google.cloud.dataproc_v1 import (
Job,
JobControllerClient,
JobPlacement,
PySparkJob,
SubmitJobRequest,
)
from google.cloud.storage import Client as GCSClient

import dagster as dg


@dg.asset
def dataproc_job_asset(
context: dg.AssetExecutionContext,
dataproc_job_client: PipesDataprocJobClient,
):
return dataproc_job_client.run(
context=context,
submit_job_params={
"request": SubmitJobRequest(
region="us-central1",
project_id="dagster-infra",
job=Job(
placement=JobPlacement(cluster_name="dagster"),
pyspark_job=PySparkJob(
main_python_file_uri="gs://dagster-pipes/script.py",
# args=["./venv.pex", "./script.py"],
file_uris=[
# "gs://dagster-pipes/venv.pex",
# "gs://dagster-pipes/script.py",
],
properties={
"spark.pyspark.python": "/pexenvs/venv.pex/bin/python",
# "spark.pyspark.driver.python": "/pexenvs/venv.pex",
# "dataproc:pip.packages": "google-cloud-storage,git+https://github.com/dagster-io/dagster.git@main",
# load gs://dagster-pipes/venv.pex as venv.pex file
},
),
),
)
},
).get_results()

This will launch the Dataproc job and wait for it to complete. If the job fails, the Dagster process will raise an exception. If the Dagster process is interrupted while the job is still running, the job will be terminated.

Setting include_stdtio_in_messages=True in the PipesDataprocJobClient constructor enables forwarding stdout and stderr from the job driver to Dagster.

Step 4: Create Dagster definitions

Next, add the PipesDataprocJobClient resource to your project's Definitions object:


from dagster import Definitions


defs = Definitions(
assets=[dataproc_job_asset],
resources={
"dataproc_job_client": PipesDataprocJobClient(
client=JobControllerClient(
client_options={
"api_endpoint": "us-central1-dataproc.googleapis.com:443"
},
),
message_reader=PipesGCSMessageReader(
bucket="dagster-pipes",
client=GCSClient(project="dagster-infra"),
include_stdio_in_messages=True,
),
context_injector=PipesGCSContextInjector(
bucket="dagster-pipes",
client=GCSClient(project="dagster-infra"),
),
)
},
)

Dagster will now be able to launch the GCP Dataproc job from the dataproc_asset asset, and receive logs and events from the job.