# Awehflow


Configuration based Airflow pipelines with metric logging and alerting out the box.
## Prerequisites
You will need the following to run this code:
* Python 3
## Installation
```
pip install awehflow[default]
```
If you are installing on Google Cloud Composer with Airflow 1.10.2:
```
pip install awehflow[composer]
```
## Usage
Usage of `awehflow` can be broken up into two parts: bootstrapping and configuration of pipelines
### Bootstrap
In order to expose the generated pipelines (`airflow` _DAGs_) for `airflow` to pick up when scanning for _DAGs_, one has to create a `DagLoader` that points to a folder where the pipeline configuration files will be located:
```python
import os
from awehflow.alerts.slack import SlackAlerter
from awehflow.core import DagLoader
from awehflow.events.postgres import PostgresMetricsEventHandler
"""airflow doesn't pick up DAGs in files unless
the words 'airflow' and 'DAG' features"""
configs_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'configs')
metrics_handler = PostgresMetricsEventHandler(jobs_table='jobs', task_metrics_table='task_metrics')
slack_alerter = SlackAlerter(channel='#airflow')
loader = DagLoader(
project="awehflow-demo",
configs_path=configs_path,
event_handlers=[metrics_handler],
alerters=[slack_alerter]
)
dags = loader.load(global_symbol_table=globals())
```
As seen in the code snippet, one can also pass in _"event handlers"_ and _"alerters"_ to perform actions on certain pipeline events and potentially alert the user of certain events on a given channel. See the sections below for more detail.
The global symbol table needs to be passed to the `loader` since `airflow` scans it for objects of type `DAG`, and then synchronises the state with its own internal state store.
\*_caveat_: `airflow` ignores `python` files that don't contain the words _"airflow"_ and _"DAG"_. It is thus advised to put those words in a comment to ensure the generated _DAGs_ get picked up when the `DagBag` is getting filled.
#### Event Handlers
As a pipeline generated using `awehflow` is running, certain events get emitted. An event handler gives the user the option of running code when these events occur.
The following events are (potentially) potentially emitted as a pipeline runs:
* `start`
* `success`
* `failure`
* `task_metric`
Existing event handlers include:
* `PostgresMetricsEventHandler`: persists pipeline metrics to a Postgres database
* `PublishToGooglePubSubEventHandler`: events get passed straight to a Google Pub/Sub topic
An `AlertsEventHandler` gets automatically added to a pipeline. Events get passed along to registered alerters.
#### Alerters
An `Alerter` is merely a class that implements an `alert` method. By default a `SlackAlerter` is configured in the `dags/PROJECT/bootstrap.py` file of an awehflow project. awehflow supports the addition of multiple alerters, which allows success or failure events to be sent to mutliple channels
##### YAML configuration
In order to add alerts to an awehflow DAG add the following to the root space of the configuration
```YAML
alert_on:
- 'failure' # Send out a formatted message if a task in the DAG fails. This is optional
- 'success' # Send out a formatted message once the DAG completes successfully. This is optional
```
##### Available alerters
###### `SlackAlerter` - `awehflow.alerts.slack.SlackAlerter`
Sends an alert to a specified slack channel via the Slack webhook functionality
- Parameters
- `channel` - The name of the channel that the alerts should be sent to
- `slack_conn_id` - The name of the airflow connection that contains the token information, default: `slack_default`
- Connection requirements - Create a HTTP connection with the name specified for `slack_conn_id`, the required HTTP fields are:
- `password` - The slack token issued by your admin team, which allows for the sending of messages via the slack python API
##### `GoogleChatAlerter` - `awehflow.alerts.googlechat.GoogleChatAlerter`
Sends an alert to the configured Google Chat space
- Parameters
- `gchat_conn_id` - The name of the airflow connection that contains the GChat space information, default: `gchat_default`
- Connection requirements - Create a HTTP connection with the name specified for the `gchat_conn_id`, the requried HTTP fields are:
- `host` - The GChat spaces URL `https://chat.googleapis.com/v1/spaces`
- `password` - The GChat spaces key configuration information, ex `https://chat.googleapis.com/v1/spaces/SPACES_ID?key=SPACES_KEY`
- `SPACES_ID` - Should be supplied by your GChat admin team
- `SPACES_KEY` - Should be supplied by your GChat admin team
### Configuration
Awehflow configuration files can be written as .yml OR .hocon files either formats are supported
Shown below is sample hocon configuration file
```h
{
name: my_first_dag,
start_date: 2022-01-01,
catchup: true,
schedule: "10 0 * * *",
version: 1,
alert_on:[
success,
failure
],
params: {
default: {
source_folder: /tmp
},
production: {
source_folder: /data
}
},
default_dag_args: {
retries: 1
},
dependencies: [
{
id: 'ping_sensor'
operator: 'airflow.sensors.bash.BashSensor'
params: {
bash_command: 'echo ping'
mode: 'reschedule'
}
}
],
tasks: [
{
id: first_dummy_task,
operator: airflow.operators.dummy.DummyOperator,
},
{
id: first_bash_task,
operator: airflow.operators.bash.BashOperator,
params: {
bash_command: 'echo "Hello World"'
},
upstream: [
first_dummy_task
]
}
]
}
```
This configuration does the following:
- Creates a DAG called `my_first_dag`
- Scheduled to run daily 10min past midnight
- Catchup has been enabled to ensure all runs of the DAG since 2022-01-01 are executed
- Dependancies
- First check if the command `echo ping` succeeds
- Tasks
- First run a dummy task that does nothing
- If the dummy task succeeds, execute the bash command
## Running the tests
Tests may be run with
```bash
python -m unittest discover tests
```
or to run code coverage too:
```bash
coverage run -m unittest discover tests && coverage html
```