--- title: "Use Cases" output: rmarkdown::html_vignette vignette: > %\VignetteIndexEntry{Use Cases} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- ## Hourly Pipeline This pipeline is an example of a standard *extract transform load* (ETL) workflow. The pipeline is scheduled to run every **3 hours** starting on **2024-04-25** at **05:45:00**. The goal of the pipeline is to perform the following: - access online hosted CSV file - perform lite data wrangling - write file to local storage in parquet format This example is setup as a simple set of tasks creating objects that are used in the next series of tasks. All components of the pipeline are within the `pipeline_wildfire_hourly` function, which has no parameters. ```{r eval = FALSE, message = FALSE, warning = FALSE} #' pipeline_wildfire_hourly maestro pipeline #' #' @maestroFrequency 3 hour #' @maestroStartTime 2024-04-25 05:45:00 #' @maestroTz America/Halifax pipeline_wildfire_hourly <- function() { # load libraries library(dplyr) library(readr) library(sf) library(sfarrow) # Access active wildfire data from hosted csv df <- readr::read_csv("https://cwfis.cfs.nrcan.gc.ca/downloads/activefires/activefires.csv") # Data wrangling df_geom <- df |> dplyr::mutate(insert_datetime = Sys.time()) |> sf::st_as_sf(coords = c("lon", "lat"), crs = 4326) # Write active wildfires to file basename <- paste("cdn_wildfire", as.integer(Sys.time()), sep = "_") df_geom |> sfarrow::write_sf_dataset("~/data/wildfires", format = "parquet", basename_template = paste0(basename, "-{i}.parquet")) } ``` ## Daily Pipeline This pipeline is an example of a standard *extract transform load* (ETL) workflow. The pipeline is scheduled to run every **day** starting on **2024-04-25** at **06:30:00**. The goal of the pipeline is to perform the following: - submit a request to an API - extract data from the API - add insert datetime column - write file to local storage in parquet format This example has a custom function that is used to access and extract the data from the API, which is piped into additional tasks. All components of the pipeline are within the `pipeline_climate_daily` function, which has no parameters. ```{r eval = FALSE, message = FALSE, warning = FALSE} #' pipeline_climate_daily maestro pipeline #' #' @maestroFrequency 1 day #' @maestroStartTime 2024-04-25 06:30:00 #' @maestroTz America/Halifax pipeline_climate_daily <- function() { # load libraries library(dplyr) library(httr2) library(arrow) # Custom function for accessing api climate data get_hourly_climate_info <- function(station_id, request_limit = 24) { # Validate parameters stopifnot("`station_id` must be a real number" = is.numeric(station_id) && station_id > 0) stopifnot("`station_id` must be a length-one vector" = length(station_id) == 1) # Access climate hourly via geomet api hourly_req <- httr2::request("https://api.weather.gc.ca/collections/climate-hourly/items") |> httr2::req_url_query( lang = "en-CA", offset = 0, CLIMATE_IDENTIFIER = station_id, LOCAL_DATE = Sys.Date() - 1, limit = request_limit ) # Perform the request hourly_resp <- hourly_req |> httr2::req_perform() # Climate station response to data frame geomet_json <- hourly_resp |> httr2::resp_body_json(simplifyVector = TRUE) geomet_json$features } # Write climate hourly to file basename <- paste("climate_hourly", as.integer(Sys.time()), sep = "_") get_hourly_climate_info(8202251) |> dplyr::mutate(insert_datetime = Sys.time()) |> arrow::write_dataset( "~/data/climate", format = "parquet", basename_template = basename ) } ```