October 6, 2022 • 5 min read

End-to-end data pipeline tests on Databricks

Rédigé par Pierre Courgeon

Pierre Courgeon

Data pipeline testing is evolving quickly but there is still a big gap to bridge with software engineering best practices. In this article, we will see how to implement an end-to-end test of a Databricks Job and what have been the positive and negative impacts on my project.

The 2 types of tests

First, let's do a small recap on data pipelines and tests. As this article explains, there are 2 types of tests for data pipelines:

  • Data quality tests: their goal is to test the quality of the data. For instance, you can test that a column "price" has only values greater than zero. These tests are run every time the data pipeline is run. You can perform data quality tests with tools like dbt, Great Expectations, or custom code.
  • Standard tests: their goal is to test the code logic. These tests are run every time the codebase is modified. Unit tests and end-to-end tests are part of this category. Most of the time, a data pipeline can only run on cloud-based products like Databricks or a Data Warehouse. In that case, off-the-shelf frameworks like Pytest cannot be used so you need custom code.

Now, let's see how to implement an end-to-end test on a Databricks job. (If you want to know how to do unit tests on Databricks, read this article about Databricks Delta Live Tables

Our data pipeline

We use a Databricks Job called “my_pipeline_job” to load data from the “raw” Mongo database, perform transformations, and finally save the tables into the “clean” Databricks database.

Data pipeline on Databricks

Each task of “my_pipeline_job” is a notebook located in our remote repository on GitHub (see Git integration with Databricks if you want to know more).

Finally, note that we deploy our job using a CI/CD tool. We chose Jenkins but it could have been any other CI/CD tool (GitHub Actions, GitLab, …).

What do we want to achieve?

Let’s say we create a branch named “feature_1”, then modify the codebase and finally open a pull request (pr_0001) on the “dev” branch.
This pull request triggers an end-to-end test of “my_pipeline_job in order to test the code of the “feature_1” branch. We want this end-to-end test to run on Databricks and test everything from reading from Mongo to writing into the “clean” database. It consists of the 3 following steps (see schema below):

  1. Run “pr_0001_init_test_job” which writes input data (that we previously defined in the code like it would be done for a unit test) into the “pr_0001_raw” Mongo database
  2. Run “pr_001_my_pipeline_job” so that it reads data from “pr_0001_raw” and creates tables in the “clean” database
  3. Run “pr_0001_assert_test_job” which asserts that “clean” tables are equal to the expected “clean” tables (that we previously defined in the code)
End-to-end test of a Databricks job

Why is everything prefixed by “pr_0001”?

It is important to note that we created new databases and jobs, all prefixed by the pull request id (here “pr_0001”) because we do not want to override the actual jobs and data! Plus, if someone else opens another pull request, let’s say “pr_0002”, we don’t want that pull request to override the data and jobs generated by the “pr_0001”.

Implement end-to-end tests step by step

1. Create a branch

First, create a branch and open a pull request on the “dev” branch. Let’s say that the pull request id is “pr_0001”.

2. Write input data into Mongo

We need to create a Mongo database named “pr_0001_raw” and its tables.

  • First, create a notebook named “init_test_notebook”. In order to be able to use this notebook for other pull requests (e.g. pr_0002_raw), we will parameterize it using Databricks widgets. Below is the notebook pseudo code:
  • Commit the notebook to your GitHub repository
  • Then, create a job named “pr_0001_init_test_job” which is only composed of “init_test_notebook
  • Configure the job to use the “init_test_notebook” located in our GitHub repository in the “feature_1” branch.
  • Finally, run the job with the following parameter: {“pr_id”: “pr_0001”}

You should now have a Mongo database named “pr_0001_raw” with the tables you have defined in your notebook.

3. Run the job being tested

Now, we want to run “my_pipeline_job” so that it reads the tables inside the “pr_0001_raw” database and writes the results into the “pr_0001_clean” Databricks database.

  • Parameterize the notebooks used by “my_pipeline_job” with the “pr_id”. You can see below what one of these notebooks would look like:
  • Commit the notebook to your repository
  • Create a copy of “my_pipeline_job” named “pr_0001_my_pipeline_job”. Configure the job to use notebooks located in our GitHub repository in the “feature_1” branch.
  • Finally, run the job with the following parameter: {“pr_id”: “pr_0001”}

You should now have a database named “pr_0001_clean” with tables inside.

4. Check the output tables

Now we want to assert that the tables in the “pr_0001_clean” database are equal to what we expected.

  • Let’s create a notebook named “assert_test_notebook”:
  • Commit the notebook to your repository
  • Then, create a job named “pr_0001_assert_test_job” that uses the “assert_test_notebook” located in our GitHub repo in the “feature_1” branch.
  • Finally, run the job with the following parameter: {“pr_id”: “pr_0001”}

It should succeed if the actual tables are equal to the expected tables that we defined in the test.

Congrats! You have run your first end-to-end test on Databricks!

5. Automate everything with a CI/CD tool

Now, let’s use Jenkins (or any CI/CD tool) to automate all the previous steps that you have done manually. As a result, an end-to-end test of “my_pipeline_job” will be run whenever a pull request is open on the “dev” branch.

5.1 Jenkinsfile

We use the Jenkins Declarative Pipeline syntax, so you need to create a file named “Jenkinsfile”.

There are 3 important environment variables that we use in the Jenkins pipelines:

  • BRANCH_NAME: env var whose value is “PR-XXXX” when a pull request is opened
  • CHANGE_TARGET: the target branch of the pull request (e.g. “dev”)
  • CHANGE_BRANCH: the branch that we want to merge into the target branch (e.g. “feature_1”)

Then create a function named “isPullRequestOnDev()” that detects when a pull request on the “dev” branch is opened (see code below).

The Jenkinsfile should look like this:

5.2 Deploy init_test_job

Now, we want to deploy “init_test_job” to Databricks using the Databricks Jobs API.

First, we need to create the configuration file of the job named “init_test_job.json” (see below). It will be put into the payload of the API call.

Tips: you can get the json configuration of an existing job using the following command: databricks jobs get --job-id <job_id>

Note that the “git_branch” is empty. Jenkins will complete it at runtime.

Your repository should look like this:

jobs/
|-- init_test_job.json
|-- my_pipeline_job.json
|-- assert_test_job.json
notebooks/
|-- init_test_notebook.py
|--
my_pipeline_notebook.py
|--
assert_test_notebook.py
Jenkinsfile

Now, let’s create a python module named “create_or_update_job.py” that reads the “init_test_job.json” file, fills it with the right values, and create (or update) the job on Databricks. We use the click library because it enables Jenkins to execute the python module with arguments.

Now, we can call the previous code snippet in the Jenkinsfile as follows:

sh "python create_or_update_job.py --branch_name ${CHANGE_BRANCH} --job_prefix ${BRANCH_NAME + '_'}"

5.3 Run “init_test_job

Now, let’s create a python module named “run_job.py” that Jenkins will call to run a Databricks job and wait for its termination using polling.

Add the following line to the Jenkinsfile:

sh "python run_job.py --job_name ${BRANCH_NAME + '_' + 'init_test_job'}"

5.4 Deploy and run my_pipeline_job

Same as 5.2 and 5.3.

5.5 Deploy and run assert_test_job

Same as 5.2 and 5.3.

5.6 Clean everything

Now, let’s clean the jobs and data that have been generated for the test. What I recommend is to create a notebook and schedule it to run once a week (for instance). We do not want to run it at the end of each Jenkins pipeline run because the data can be useful to debug tests that fail.
This notebook should do the following:

  • Delete jobs thanks to the Databricks Jobs API
  • Delete Databricks databases with the following command: spark.sql(”DROP DATABASE <pr_id> CASCADE”)
  • Deleted Mongo databases using the PyMongo library

Impact of tests on the project

Advantages

In this project, the complexity of the data pipeline had grown a lot. Any modification of the data pipeline created a new bug. End-to-end tests allowed us to catch many bugs before they reached production.
In addition, writing tests forced developers to improve their understanding of the data and business rules. This help to ameliorate the communication between the developers and the product owner.

Challenges

Since we started writing tests only when the data pipeline had become very complex, we had a lot of “clean” tables to test. We addressed this problem by prioritizing the “clean” tables.
Another disadvantage is that the end-to-end tests slowed done the CI/CD pipeline. We had to wait for around 20 minutes before merging any pull request. We could have improved our way of managing clusters and jobs in order to shorten this time (but we did not have time to implement it).

Conclusion

There is no reason why data pipelines should not follow software engineering best practices. Every data pipeline should be tested and not only with Data Quality tests! End-to-end tests are also very important because they drastically reduce the risk to create new bugs in production when modifying the codebase. We have seen that implementing end-to-end tests on Databricks is fairly easy so let’s do it on your project!

Are you looking for Data Engineering Experts? Don't hesitate to contact us!

Cet article a été écrit par

Pierre Courgeon

Pierre Courgeon