Quick Start

A common task in data engineering is to automate, schedule, and monitor multiple data processing pipelines. This is called orchestration. maestro is an R package that helps orchestrate data pipelines.

A fully realized maestro project involves the following components and actions:

  1. Collection of pipelines (R functions to be orchestrated, such as batch ETL jobs)

  2. Orchestrator - an R script or Quarto doc that orchestrates the pipelines and monitors them

  3. A process external to R to schedule the orchestrator (e.g., cron, Posit Connect).

Project Setup

library(maestro)

Create a maestro project in an existing project or a new project using create_maestro() or the New Project wizard in RStudio. This creates the orchestrator script and the folder of pipelines with one sample pipeline. Your project should look something like this:

maestro_project
├── maestro_project.Rproj
├── orchestrator.R
└── pipelines
    ├── my_pipe.R
    └── another_pipe.R

Pipelines

Pipelines are the jobs you want to automate, schedule, and monitor. For the most part, they’re regular R functions with a special sprinkling of comments.

Anatomy of a Pipeline

A pipeline is simply an R function with decorators called maestro tags. Maestro tags are special code comments used for communicating the scheduling and configuration of a pipeline to the orchestrator. Let’s take a quick look at the sample my_pipe.R:

#' my_pipe maestro pipeline
#'
#' @maestroFrequency 1 day
#' @maestroStartTime 2024-05-24
#' @maestroTz UTC
#' @maestroLogLevel INFO

my_pipe <- function() {

  # Pipeline code
}

my_pipe is a function with an empty body - so right now it won’t do anything. The comments above are interpreted by maestro as “this function is scheduled to run every 1 day starting at 2024-05-24 (00:00:00) UTC time”.

Note that you don’t need to provide all these tags. A single maestro tag is enough to distinguish it as a pipeline. Pipelines missing tags will use consistent defaults (e.g., if maestroFrequency is missing the default is 1 day).

In most use cases, the actual code inside of my_pipe would be to run an ETL job (extract data from a source, transform it, and load it into a file system or database). In technical terms, it’s the side effect of the code and not its return value that is important.

Here’s a more realistic, albeit impractical, example:

#' my_pipe maestro pipeline
#'
#' @maestroFrequency 1 day
#' @maestroStartTime 2024-05-24
#' @maestroTz UTC
#' @maestroLogLevel INFO

my_pipe <- function() {

  random_data <- data.frame(
    letters = sample(letters, 10),
    numbers = sample.int(10)
  )
  
  write.csv(random_data, file = tempfile())
}

Adding New Pipelines

A project with a single pipeline is ok, but in maestro is more useful when you have multiple jobs to run. You can add more pipelines to your pipelines directory manually or use create_pipeline():

create_pipeline(
  pipe_name = "another_pipeline",
  pipeline_dir = "pipelines",
  frequency = "1 hour",
  start_time = "2024-05-17 15:00:00",
  tz = "America/Halifax",
  log_level = "ERROR"
)

Orchestrator

The orchestrator is the process that schedules and monitors the pipelines.

Anatomy of the Orchestrator

The orchestrator can be an R script, Quarto/RMarkdown doc, but here we’ll use a regular R script. Here is where you’ll run maestro functions. The two main functions are build_schedule() and run_schedule().

library(maestro)

schedule_table <- build_schedule()

output <- run_schedule(
  schedule_table,
  orch_frequency = "1 hour"
)

Building the schedule gets maestro to look through the pipelines in the pipelines folder and creates a data.frame with schedule information. Then, you pass that data.frame to run_schedule() along with how often the orchestrator is supposed to run. It is important to tell maestro how often it’ll be checking the pipelines using the orch_frequency parameter. Here, we’re informing it that the orchestrator is running every 1 hour.

Importantly, it isn’t maestro’s job to actually run it this often - it’s your job to make sure it runs at that frequency (e.g., deploying it via cron or some cloud environment where code can be scheduled).1


  1. The decision around how often to run the orchestrator depends on the frequencies of the pipelines in the project. The simplest guideline is to take the frequency of your most often recurring pipeline and split that in half. So for example, if my most often running pipeline runs every 1 day then my orchestrator should run every 12 hours.↩︎