# Alibaba PAI Python SDK
AliPAI Python SDK is provided by PAI team of Alibaba computing platform. It provides convenience for users to access [PAI service in Alibaba Cloud](https://www.aliyun.com/product/bigdata/product/learn).
In current, PAI SDK supports PAIFlow(ML Pipeline Service of PAI) service, other PAI services, such as EAS(Elastic Algorithm Service) and Blade will be included soon.
## Installation
To install the PAI sdk, use the below command in terminal.
```bash
python -m pip install alipai
```
## Usage
### Setup default PAI session
Before use PAI service via SDK, developer should initialize the default PAI session by providing credential and region_id of service.
> **Pipeline service of PAI is currently provided in `cn-shanghai` region only**.
```python
from pai.core.session import setup_default_session
session = setup_default_session(access_key_id="your_access_key", access_key_secret="your_access_secret", region_id="your_region_id")
```
### Access Pipeline Service
#### Use PipelineTemplate
PipelineTemplate instance includes the definition of "Workflow" use in PAI pipeline service. It could be fetched from remote PAI service or constructed from local Pipeline/Component.
Saved pipeline template has unique `pipeline_id` which is generated by pipeline service. Remote pipeline template could be fetched using identifier-provider-version or pipeline_id.
PAI provides a list of public pipeline templates which could be used as workflow template to run or to build pipeline. These templates are accessible by the specific provider `pai.common.ProviderAlibabaPAI` in `PipelineTemplate.list`.
```python
from pai.pipeline import PipelineTemplate
from pai.common import ProviderAlibabaPAI
# search PipelineTemplate which provide by `PAI` and include `xflow` in identifier.
template = next(PipelineTemplate.list(identifie="xflow", provider=ProviderAlibabaPAI))
# view template inputs/outputs.
template
template.inputs
template.outputs
```
After submitting run job, users are able to inspect the detailed workflow DAG, execution log and outputs of the pipeline by visiting the job detail URL printed in console.
```python
from pai.common import ProviderAlibabaPAI
from pai.pipeline import PipelineTemplate
# Get specific template by Identifier-Provider-Version
template = PipelineTemplate.get_by_identifier(identifier="split-xflow-maxCompute",
provider=ProviderAlibabaPAI, version="v1")
xflow_execution = {
"odpsInfoFile": "/share/base/odpsInfo.ini",
"endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
"logViewHost": "http://logview.odps.aliyun.com",
"odpsProject": "your_odps_project",
}
# run pipeline use provide arguments.
job = template.run(job_name="demo-split-job", arguments={
"inputArtifact": "odps://pai_online_project/tables/mnist_data",
"execution": xflow_execution, "fraction": 0.7}, wait=True)
job.get_outputs()
```
### Build runnable and reusable pipeline
PAI Pipeline Service supports nested user-defined workflow. Composite pipeline is runnable by providing required arguments. Saved pipeline template could be used as a step to build a new pipeline.
```python
def create_composite_pipeline():
# Definite the inputs parameters in pipeline
execution_input = PipelineParameter(name="execution", typ=dict)
cols_to_double_input = PipelineParameter(name="cols_to_double")
table_input = PipelineArtifact(name="data_source", metadata=ArtifactMetadata(
data_type=ArtifactDataType.DataSet,
location_type=ArtifactLocationType.MaxComputeTable))
# Pipeline step from remote PAI service.
type_transform_step = PipelineStep(
identifier="type-transform-xflow-maxCompute", provider=ProviderAlibabaPAI,
version="v1", name="typeTransform", inputs={
"inputArtifact": table_input, "execution": execution_input,
"outputTable": gen_temp_table(), "cols_to_double": cols_to_double_input,
}
)
split_template = PipelineTemplate.get_by_identifier(identifier="split-xflow-maxCompute",
provider=ProviderAlibabaPAI, version="v1")
split_step = split_template.as_step(inputs={"inputArtifact": type_transform_step.outputs[0],
"execution": execution_input, "output1TableName": gen_temp_table(),
"fraction": 0.5, "output2TableName": gen_temp_table(),
})
# Initialize the pipeline instance by specific the steps and outputs.
p = Pipeline(
steps=[split_step],
outputs=split_step.outputs[:2],
)
return p
p = create_composite_pipeline()
# Run pipeline with required arguments.
pipeline_run = p.run(job_name="demo-composite-pipeline-run", arguments={
"execution": xflow_execution,
"cols_to_double": "time,hour,pm2,pm10,so2,co,no2",
"data_source": "odps://pai_online_project/tables/wumai_data",
}, wait=True)
# Save Pipeline
p.save(identifier="demo-composite-pipeline", version="v1")
```