# Data-Quality-Check
[![CI Build](https://github.com/tw-data-quality-2022/spark-profiling/actions/workflows/python-package.yml/badge.svg)
](https://github.com/tw-data-quality-2022/spark-profiling/actions/workflows/python-package.yml)
![DQC](./logo.png)
## Requirements
- Python 3.7+
- Java 8+
- Apache Spark 3.0+
## Usage
### Installation
```shell
pip install --upgrade data-quality-check
# Install Spark if needed
pip install pyspark
```
### Quick Start
```python
from data_quality_check.config import Config
from data_quality_check.profiler.combined_profiler import CombinedProfiler
from data_quality_check.report.renders.html.render import render_all
config_dict = {
'dataset': {'name': 'mydb.my_table'},
'profiling': {
'general': {'columns': ['*']}
}
}
config = Config().parse_obj(config_dict)
profiler = CombinedProfiler(spark, config=config)
result = profiler.run()
html = render_all(all_pr=result)
# Present in Jupyter notebooks
from IPython.core.display import display, HTML
display(HTML(html))
# Present in Databricks notebooks
displayHTML(html)
# Save to a html file
f = open("report.html", "w")
f.write(html)
f.close()
```
If you do not have a ready-to-use spark session, use the codes below to create one:
```python
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MyApp").enableHiveSupport().getOrCreate()
```
## Development
### Dependencies
| Filename | Requirements|
|----------|-------------|
| requirements.txt | Package requirements|
| requirements-dev.txt | Requirements for development|
### Test
```shell
PYTHONPATH=./src pytest tests/*
```
### Build
```
python setup.py sdist bdist_wheel && twine check dist/*
```
### Publish
```
twine upload --repository-url https://test.pypi.org/legacy/ dist/*
twine upload dist/*
```
# Manual
## Profiling Check
There are 2 types of useful profilers : `GeneralProfiler` and `CustomizedProfiler`.
If you would like to run both profilers on your dataset. You can use `CombinedProfiler` which will run both profilers.
### Combined Profiler
The easiest way to run a combined profiler(mix of general and customized profiler) on you dataset:
Example of running combined profiling
```python
from data_quality_check.config import Config
from data_quality_check.profiler.combined_profiler import CombinedProfiler
from data_quality_check.report.renders.html.render import render_all
config_dict = {
'dataset': {'name': 'my_table'},
'profiling': {
'general': {'columns': ['*']}
},
'customized': {
'code_check': [
{'column': 'my_code_col', 'codes': ['A', 'B', 'C', 'D']}
]
}
}
config = Config().parse_obj(config_dict)
profiler = CombinedProfiler(spark, config=config)
result = profiler.run()
html = render_all(all_pr=result)
displayHTML(html)
```
### General Profiler
```python
from pyspark.sql import SparkSession
from data_quality_check.config import ConfigDataset
from data_quality_check.profiler.general_profiler import GeneralProfiler
spark = SparkSession.builder.appName("SparkProfilingApp").enableHiveSupport().getOrCreate()
data = [{'name': 'Alice', 'age': 1, 'gender': 'female', 'is_new': True},
{'name': 'Tom', 'age': 10, 'gender': 'male', 'is_new': False}]
# Run general check on spark df
df = spark.createDataFrame(data)
result_df = GeneralProfiler(spark, df=df).run(return_type='dataframe')
result_df.show()
# Run general check on spark/hive table
df.createOrReplaceTempView('my_table')
result_df = GeneralProfiler(spark, dataset_config=ConfigDataset(name='my_table')).run(return_type='dataframe')
result_df.show()
```
### Customized Profiler
```python
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType
from data_quality_check.config import Config, ConfigDataset, ConfigProfilingCustomized
from data_quality_check.profiler.customized_profiler import CustomizedProfiler
# Initialize spark
spark = SparkSession.builder.appName("SparkProfilingApp").enableHiveSupport().getOrCreate()
dept = [("Finance", 1),
("Marketing", 2),
("Sales", 3),
("IT", 4)]
deptSchema = StructType([StructField('dept_name', StringType(), True),
StructField('dept_id', LongType(), True)])
spark.createDataFrame(data=dept, schema=deptSchema).createOrReplaceTempView('dept')
print('dept table:')
spark.table('dept').show(truncate=False)
employee = [(1, "Amy", 1, 'male', 1000, 'amy@example.com'),
(2, "Caro", 2, 'male', 1000, 'caro@example.com'),
(3, "Mark", 3, 'Error', 2000, 'unknown'),
(4, "Timi", 4, 'female', 2000, None),
(5, "Tata", 5, 'unknown', 3000, 'bad email address'),
(6, "Zolo", None, None, 3000, 'my-C0omplicated_EMAIL@A.ddress.xyz')]
employeeSchema = StructType([StructField('uid', LongType(), True),
StructField('name', StringType(), True),
StructField('dept_id', LongType(), True),
StructField('gender', StringType(), True),
StructField('income', LongType(), True),
StructField('email', StringType(), True)])
spark.createDataFrame(data=employee, schema=employeeSchema).createOrReplaceTempView('employee')
print('employee table:')
spark.table('employee').show(truncate=False)
# Specify the configuration of customized profiler
customized_config_dict = {
'code_check': [
{'column': 'gender', 'codes': ['male', 'female', 'unknown']}
],
'key_mapping_check': [
{'column': 'dept_id', 'target_table': 'dept', 'target_column': 'dept_id'}
]
}
customized_config = ConfigProfilingCustomized.parse_obj(customized_config_dict)
dataset_config = ConfigDataset.parse_obj({'name': 'employee'})
# Initialize CustomizedProfiler with configuration
customized_profiler = CustomizedProfiler(spark,
dataset_config=dataset_config,
customized_profiling_config=customized_config)
result = customized_profiler.run(return_type='dict')
print(json.dumps(result, indent=' ', ensure_ascii=False, allow_nan=True))
```
## Expectation Verification
To be done.
## Supported Checks and Expectation
| Profiler Type | Check Type | Render result as HTML? | Support Expectation? | Description |
|---|---|---|---|---|
| General | Distinct Values Count | YES | Will DO | Number of unique values in a given column. This equals to Unique Row Count |
| General | Null Row Count | YES | Will DO | Null row count in a given column |
| General | Empty Row Count | YES | Will DO | Empty/Blank text row count in a given column |
| General | Zero Row Count | YES | Will DO | 0-valued row count in a given column |
| General | Valued Row Count | YES | Will DO | Number of rows which are not null in a given column |
| General | Total Row Count | YES | Will DO | Number of total rows |
| General | Unique Row Count | YES | Will DO | Number of rows that have unique value |
| General | Duplicated Valued Row Count | YES | Will DO | Number of rows that have duplicated values |
| General | Minimum Value | YES | Will DO | Minimum value |
| General | Maximum Value | YES | Will DO | Maximum value |
| General | Mean Value | YES | Will DO | Mean/average value |
| General | Standard Deviation Value | YES | Will DO | Standard deviation value of a column |
| General | Values Count | YES | Will DO | Number of values in a given column |
|---|---|---|---|---|
| Customized | Code Check | YES | Will DO | Check if values from the columns are in the given(expected) codes list |
| Customized | Key Mapping Check | Will DO | Will DO | Find the values from this table column that do not exist in the target(another) table's column. Hint: target table usually is dim table |
`Will DO` = Is scheduled to be developed, but not implemented yet.
| Expectation Type | Scope | Description |
|---|---|---|
| ExpectColumnToExist |---|---|
| ... | ... | ... |