# airflow-oracle-snowflake-plugin
### Steps to use the OracleToSnowflake from the plugin
1. Install the plugin by `pip install airflow-oracle-snowflake-plugin`. You can put `airflow-oracle-snowflake-plugin` in the requirements.txt file for CI/CD operations. This plugin will also install the following dependencies if not already satisfied:
* oracledb
* apache-airflow-providers-oracle
* apache-airflow-providers-snowflake
2. Create `config.py` inside `dags/table_config` directory. This file will include the necessary information about the source and destination database table specifications. It will have the structure as follows:
```py
CONFIG = [
{
'source_schema': 'ADMIN',
'source_table': 'CUSTOMERS',
'destination_schema': 'PUBLIC',
'destination_table': 'CUSTOMERS',
'columns': [
('ID', 'varchar'),
('FULL_NAME', 'varchar'),
('ADDRESS', 'varchar'),
('EMAIL', 'varchar'),
('PHONE_NUMBER', 'varchar'),
]
},
]
```
3. Import the operator, sql_utils and the config in your DAG python file by including the following statements:
```
from airflow_oracle_snowflake_plugin.oracle_to_snowflake_operator import OracleToSnowflake
import airflow_oracle_snowflake_plugin.utils.sql_utils as sql_utils
from table_config.config import CONFIG
```
4. Implement a for loop to iterate over all the table configurations and create DAG tasks using the operator as follows:
```py
for config in CONFIG:
create_table_statement = sql_utils.get_create_statement(
table_name=config.get('destination_table'),
columns_definition=config.get('columns')
)
create_table_if_not_exists = SnowflakeOperator(
task_id='create_{}'.format(config.get('destination_table')),
snowflake_conn_id='SNOWFLAKE',
sql=create_table_statement,
warehouse='LANDING',
database='LANDING_DEV',
role='ACCOUNTADMIN',
schema=config.get('destination_schema'),
dag=dag
)
fill_table_statement = sql_utils.get_select_statement(
table_name=config.get('source_table'),
schema_name=config.get('source_schema'),
columns_definition=config.get('columns'),
sql_server_syntax=False
)
oracle_to_snowflake_operator = OracleToSnowflake(
task_id = 'recreate_{}'.format(config.get('destination_table')),
dag = dag,
warehouse='LANDING',
database='LANDING_DEV',
role='ACCOUNTADMIN',
schema='PUBLIC',
source_schema=config.get('source_schema'),
source_table=config.get('source_table'),
destination_schema=config.get('destination_schema'),
destination_table=config.get('destination_table'),
fill_table_statement=fill_table_statement,
snowflake_conn_id='SNOWFLAKE',
oracle_conn_id='ORACLE',
recreate_table=True
)
create_table_if_not_exists >> oracle_to_snowflake_operator
```
This script will create two tasks for each table in Oracle database that you want to migrate. This will be determined by the `CONFIG` array in `config.py`.
#### First Task
First task creates the table in the Snowflake database if it doesn't exist already using the SnowflakeOperator. It requires:
* An existing airflow connection to your Snowflake account
* Name of the warehouse to use ('LANDING' in the example above)
* Name of the database to use ('LANDING_DEV' in the example above)
* Name of the role to use ('ACCOUNTADMIN' in the example above).
* It takes an SQL statement which we have provided as the `create_table_statement` generated by the `sql_utils.get_create_statement` method. The method uses `CONFIG` and extracts the table name, columns, and their data types.
#### Second Task
The second task uses the `OracleToSnowflake` operator from the plugin. It creates a temporary csv file after selecting the rows from the source table, uploads it to a Snowflake stage, and finally uploads it to the destination table in Snowflake. It requires:
* An existing airflow connection id to your Snowflake account as well as your Oracle database instance. The connection IDs will default to `SNOWFLAKE` and `ORACLE` if not provided.
* Inside the operator, a custom Snowflake hook is used which will upload the csv file to a Snowflake table. This hook requires:
* Name of the warehouse to use (defaults to 'LANDING' if not provided)
* Name of the database to use (defaults to'LANDING_DEV' if not provided)
* Name of the role to use (defaults to 'ACCOUNTADMIN' if not provided).
* It takes an SQL statement which we have provided as the `fill_table_statement` generated by the `sql_utils.get_select_statement` method. The method uses `CONFIG` and extracts the table name, schema, and the columns.
### Note
Added tags to facilitate version releasing and CI/CD operations