<h1 align="left">Endpointlib</h1>
<p align="left">MQTT Endpoint library</p>
## Links
- [GitHub](https://github.com/afranco-astro/endpoint-lib "GitHub Repository")
- [PyPI](https://pypi.org/project/endpointlib "PyPI Project")
## How to install
From your virtual environment run:
### `pip install endpointlib`
## How to use
### Basic example
This example just creates an enpoint to publish some data to the mqtt broker:
import asyncio
import random
from endpointlib.endpoint_factory import EndpointFactory
from endpointlib.helpers.loggers.logger_level import LoggerLevel
from endpointlib.helpers.loggers.logger_manager import LoggerManager
from endpointlib.helpers.loggers.logger_settings import LoggerSettings
LoggerManager.create(LoggerSettings.get_debug_settings(level=LoggerLevel.INFO))
logger = LoggerManager.get_async_logger(name='demo')
async def demo():
endpoint = EndpointFactory.basic_endpoint(('localhost', 1883), main_callback=main_entry_point)
await endpoint.run_forever()
async def main_entry_point(client):
await logger.info('[main_entry_point]')
#async loop
while True:
#call other async services
data = random.randrange(10, 52, 1)
data_topic = 'endpoint/data/sample'
await client.publish(data_topic, data, qos=1, retain=True)
await logger.info('Data published')
await asyncio.sleep(5)
asyncio.run(demo(), debug=True)
### Advanced example
This example creates an endpoint that monitors a tcp socket device. This implementation uses a simple echo tcp server to simulate the responses, this server is not included in the package.
import asyncio
import sys
import inspect, os, sys
currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
parentdir = os.path.dirname(os.path.dirname(currentdir))
sys.path.insert(0, parentdir)
from utils.demo_helper import DemoHelper
from endpointlib.endpoint_factory import EndpointFactory
from endpointlib.helpers.loggers.logger_level import LoggerLevel
from endpointlib.helpers.loggers.logger_manager import LoggerManager
from endpointlib.helpers.loggers.logger_settings import LoggerSettings
LoggerManager.create(LoggerSettings.get_debug_settings(level=LoggerLevel.INFO))
logger = LoggerManager.get_async_logger(name='demo')
async def demo():
myCallbacks = dict()
myCallbacks['endpoint/control/device1/operation01'] = on_operation_01
myCallbacks['endpoint/control/device1/operation02'] = on_operation_02
broker = ('localhost', 1883)
cmd = ':MONITOR1234!'
# ('host_to_monitor', 'port', 'monitor_interval', 'command_to_monitor', 'on_monitor_callback')
monitor = ('localhost', 10001, 10, cmd, on_monitor)
global endpoint
endpoint = EndpointFactory.socket_monitor_endpoint(mqtt_connection=broker, socket_monitor=monitor, handlers=myCallbacks)
await endpoint.run_forever()
async def on_monitor(status):
await logger.info('[on_monitor]: ' + status)
await endpoint.publish('endpoint/control/device1/response/01', '1234')
async def on_operation_01(topic, payload):
response = await endpoint.send_to_device(':M11!')
await logger.info('[on_operation_01]Response from device: ' + response)
async def on_operation_02(topic, payload):
response = await endpoint.send_to_device(':R00!')
await logger.info('[on_operation_02]Response from device: ' + response)
async def main():
await DemoHelper.run_coro(server_port=10001, coro=demo())
asyncio.run(main(), debug=True)
# Development setup
[TODO: Development setup instructions]
## Built With
- Pyhton
## Author
**Alfonso Franco**
- [Profile](https://github.com/afranco-astro "Alfonso Franco")
- [Email](mailto:afranco@astro.unam.mx)