# FastSTAN
Easily deploy NATS and NATS Streaming subscribers using Python.
## Features
- Define subscribers using sync and async python functions
- Automatic data parsing and validation using type annotations and pydantic
- Support all subscription configuration available in stan.py and nats.py
- Start subscriptions or services from command line
- Publish messages from command line
## Quick start
- Install the package from pypi:
pip install faststan
### Using the command line
Create your first NATS subscriber:
- Create a file named `app.py` and write the following lines:
from pydantic import BaseModel
class NewEvent(BaseModel):
name: str
datetime: int
def on_event(event: NewEvent):
print(f"INFO :: Received new message: {event}")
- Start your subscriber:
nats sub start demo --function app:on_event
- Publish a message:
nats pub demo --name "John Doe" --datetime 1602661983
NATS Streaming behave the same way:
- Define your subscription:
from pydantic import BaseModel
class Greetings(BaseModel):
message: str
def on_event(event: NewEvent) -> Greetings:
print(f"Info :: Received new request.")
return Greetings(message=f"Welcome to {event.name}!"
- Start it using `stan sub start` command:
stan sub start demo --function app:on_event
- And publish message using `stan pub` command:
stan pub demo --name "John Doe"
### Using Python API
In this example, we will build a machine learning service that perform a prediction using an ONNX model. This service will be impletended using the [request/reply] pattern.
Before running the example, make sure you have the dependencies installed:
- `onnxruntime`
- `numpy`
- `httpx`
import asyncio
from typing import List, Dict
from faststan.nats import FastNATS
from pydantic import BaseModel, validator
from httpx import AsyncClient
import numpy as np
import onnxruntime as rt
async def load_predictor(
app: FastNATS,
url: str = "https://s3-per-grenoble.ams3.digitaloceanspaces.com/models/rf_iris.onnx",
) -> None:
"""Load an ONNX model and return a predictor for this model."""
async with AsyncClient() as http_client:
http_response = await http_client.get(url)
sess = rt.InferenceSession(http_response.content)
input_name = sess.get_inputs()[0].name
label_name = sess.get_outputs()[0].name
proba_name = sess.get_outputs()[1].name
def predict(data: np.ndarray):
"""Perform prediction for given data."""
return sess.run([label_name, proba_name], {input_name: data})
app.state["predictor"] = predict
class Event(BaseModel):
"""Incoming data expected by the predictor."""
values: np.ndarray
timestamp: int
@validator("values", pre=True)
def validate_array(cls, value):
"""Cast data to numpy array with float32 precision.
A ValidationError will be raise if any error is raised in this function.
return np.array(value, dtype=np.float32)
class Config:
# This must be set to True in order to let pydantic handle numpy types
arbitrary_types_allowed = True
class Result(BaseModel):
"""Result returned by the predictor."""
probabilities: List[
Dict[int, float]
] # Example: [{ 0: 0.25, 1:0.75}, {0: 0.15, 1: 0.85}]
labels: List[int] # Example: [1, 1]
app = FastNATS()
app.state = {}
await load_predictor(app)
await app.connect()
def predict(event: Event) -> Result:
print(f"{event.timestamp} :: Received new event data")
labels, probas = app.state["predictor"](event.values)
return {"probabilities": probas, "labels": labels.tolist()}
await app.start()
- You can now publish messages on the service:
from faststan import FastNATS
async with FastNATS() as nats_client:
reply_msg = await nats_client.request_json(
"predict", {"values": [[0, 0, 0, 0]], "timestamp": 1602661983}
print(f"Received a reply: {reply_msg}")