Atelier
=======
Library for fabrique ml pipelines development.
You can copy pipeline project examples from here
.. code:: python
import fabrique_atelier
print(fabrique_atelier.__path__[0] + '/examples')
Model decomposition steps
-------------------------
You can find this decomposed example in /examples/fake_lightgbm
1. We have samples and model inference code
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. code:: python
# load samples
samp_dir = './samples'
filenames = [f for f in os.listdir(samp_dir) if os.path.isfile(f'{samp_dir}/{f}')]
samples = []
for filename in filenames:
with open(f'{samp_dir}/{filename}') as fp:
samples.append(fp.read())
sample is json string
.. code:: json
{
"ts": 1622637967.6218433,
"uid": 147757848,
"number": "00000125",
"type": "out",
"traffic": {
"about": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"services": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"contacts": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"roaming": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"tariffs": [14, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"simcards": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"balance": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"internet": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"messaging": [0, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"support": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]}
}
.. code:: python
# load model
import lightgbm as lgb
bst = lgb.Booster(model_file='../model.txt') # init model
classes = ["about", "services", "contacts", "roaming", "tariffs",
"simcards", "balance", "internet", "messaging", "support"]
.. code:: python
# simple inference code
import numpy as np
reference_results = [] # we will use this results for tests
for sample in samples:
## 1. extract features from json message
# 1.1. parse message and get features
mes = json.loads(sample)
features = np.array([mes['traffic'][cls] for cls in classes]).flatten()
## 2. make prediction result
# 2.1 get prediction
pred_vals = bst.predict([features, ])[0]
# 2.2 normalize scores, get class
scores = pred_vals/pred_vals.sum()
reason = classes[scores.argmax()]
# 2.3 make and serialize message
scores_dict = {cls: round(scores[i], 2) for i, cls in enumerate(classes)}
res = dict(ts=mes['ts'], uid=mes['uid'], number=mes['number'], classes=scores_dict, reason=reason)
reference_results.append(json.dumps(res, indent=2))
2. Split inference into feature extraction and lightgbm prediction
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. code:: python
# splitted inference code
splitted_results = []
feature_msgs = []
for sample in samples:
## 1. extract features from json message
# 1.1. parse message and get features
mes = json.loads(sample)
features = np.array([mes['traffic'][cls] for cls in classes]).flatten()
# 1.2. create intermediate message
data = dict(ts=mes['ts'], uid=mes['uid'], number=mes['number'], features=features.tolist())
feature_msgs.append(data)
## 2. make prediction result
features_batch = np.array([data['features'] for data in feature_msgs])
# 2.1 get prediction
pred_batch = bst.predict(features_batch) # microbatch prediction
for i, mes in enumerate(feature_msgs):
scores = pred_batch[i]
# 2.2 normalize scores, get class
try:
scores = scores / scores.sum() # try to normalize
except:
pass
reason_num = scores.argmax()
reason = classes[reason_num]
# 2.3 make and serialize message
scores_dict = {cls: round(scores[i], 2) for i, cls in enumerate(classes)}
res = dict(ts=mes['ts'], uid=mes['uid'], number=mes['number'], classes=scores_dict, reason=reason)
splitted_results.append(json.dumps(res, indent=2))
check if results are equal
.. code:: python
assert set(splitted_results) == set(reference_results)
3. Make pipeline.py
~~~~~~~~~~~~~~~~~~~
.. code:: python
from fabrique_atelier.actors import Pipeline, Processor
import json
import numpy as np
import lightgbm as lgb
class ExtractFeatures(Processor):
def __init__(self):
self.classes = ["about", "services", "contacts", "roaming", "tariffs",
"simcards", "balance", "internet", "messaging", "support"]
def get_result(self, body):
## 1. extract features from json message
# 1.1. parse message and get features
mes = json.loads(body['data'])
features = np.array([mes['traffic'][cls] for cls in self.classes]).flatten()
# 1.2. create intermediate message
data = dict(ts=mes['ts'], uid=mes['uid'], number=mes['number'], features=features.tolist())
return {'data': data}
class ScoringModel(Processor):
def __init__(self):
self.classes = ["about", "services", "contacts", "roaming", "tariffs",
"simcards", "balance", "internet", "messaging", "support"]
self.bst = lgb.Booster(model_file='./model.txt') # init model
def get_batch_result(self, batch):
## 2. make prediction result
features_batch = np.array([body['data']['features'] for body in batch])
# 2.1 get prediction
pred_batch = self.bst.predict(features_batch) # microbatch prediction
out_batch = []
for i, body in enumerate(batch):
in_data = body['data']
scores = pred_batch[i]
# 2.2 normalize scores, get class
try:
scores = scores / scores.sum() # try to normalize
except:
pass
reason_num = scores.argmax()
reason = self.classes[reason_num]
# 2.3 make and serialize message
scores_dict = {cls: round(scores[i], 2) for i, cls in enumerate(self.classes)}
out_data = dict(ts=in_data['ts'], uid=in_data['uid'], number=in_data['number'],
classes=scores_dict, reason=reason)
out_body = dict(data=json.dumps(out_data).encode(), metrics={"reason_num": int(reason_num)})
out_batch.append(out_body)
return out_batch
# topology
pipeline = Pipeline(['extractor', 'model'])
ids = pipeline.ids
nodes = pipeline.nodes
nodes.extractor = ExtractFeatures.to(ids.model)
nodes.model = ScoringModel.to(ids.end)
4. Run emulation and check results
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
run ./dev/start_pipeline.py with this code
.. code:: python
from pipeline import pipeline
# load samples
## code from first decomposition step above ##
# start emulation
pipeline.start_emulation(samples)
# simple inference code
## code from first decomposition step above ##
# load results of emulation
result_dir = './out/data'
filenames = [f for f in os.listdir(result_dir) if os.path.isfile(f'{result_dir}/{f}')]
results = []
for filename in filenames:
with open(f'{result_dir}/{filename}') as fp:
results.append(fp.read())
#check if results are equal
assert set(results) == set(reference_results)