معرفی شرکت ها


fastapi-cloud-tasks-0.1.1rc1


Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر
Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر
Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر
Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر
Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر

توضیحات

Trigger delayed Cloud Tasks from FastAPI
ویژگی مقدار
سیستم عامل -
نام فایل fastapi-cloud-tasks-0.1.1rc1
نام fastapi-cloud-tasks
نسخه کتابخانه 0.1.1rc1
نگهدارنده []
ایمیل نگهدارنده []
نویسنده -
ایمیل نویسنده -
آدرس صفحه اصلی -
آدرس اینترنتی https://pypi.org/project/fastapi-cloud-tasks/
مجوز -
# FastAPI Cloud Tasks Strongly typed background tasks with FastAPI and Google CloudTasks. ## Installation ``` pip install fastapi-cloud-tasks ``` ## Key features - Strongly typed tasks. - Fail at invocation site to make it easier to develop and debug. - Breaking schema changes between versions will fail at task runner with Pydantic. - Familiar and simple public API - `.delay` method that takes same arguments as the task. - `.scheduler` method to create recurring job. - Tasks are regular FastAPI endpoints on plain old HTTP. - `Depends` just works! - All middlewares, telemetry, auth, debugging etc solutions for FastAPI work as is. - Host task runners independent of GCP. If CloudTasks can reach the URL, it can invoke the task. - Save money. - Task invocation with GCP is [free for first million, then costs $0.4/million](https://cloud.google.com/tasks/pricing). That's almost always cheaper than running a RabbitMQ/Redis/SQL backend for celery. - Jobs cost [$0.1 per job per month irrespective of invocations. 3 jobs are free.](https://cloud.google.com/scheduler#pricing) Either free or almost always cheaper than always running beat worker. - If somehow, this cost ever becomes a concern, the `client` can be overriden to call any gRPC server with a compatible API. [Here's a trivial emulator implementation that we will use locally](https://github.com/aertje/cloud-tasks-emulator) - Autoscale. - With a FaaS setup, your task workers can autoscale based on load. - Most FaaS services have free tiers making it much cheaper than running a celery worker. ## How it works ### Delayed job ```python from fastapi_cloud_tasks import DelayedRouteBuilder delayed_router = APIRouter(route_class=DelayedRouteBuilder(...)) class Recipe(BaseModel): ingredients: List[str] @delayed_router.post("/{restaurant}/make_dinner") async def make_dinner(restaurant: str, recipe: Recipe): # Do a ton of work here. app.include_router(delayed_router) ``` Now we can trigger the task with ```python make_dinner.delay(restaurant="Taj", recipe=Recipe(ingredients=["Pav","Bhaji"])) ``` If we want to trigger the task 30 minutes later ```python make_dinner.options(countdown=1800).delay(...) ``` ### Scheduled Task ```python from fastapi_cloud_tasks import ScheduledRouteBuilder scheduled_router = APIRouter(route_class=ScheduledRouteBuilder(...)) class Recipe(BaseModel): ingredients: List[str] @scheduled_router.post("/home_cook") async def home_cook(recipe: Recipe): # Make my own food app.include_router(scheduled_router) # If you want to make your own breakfast every morning at 7AM IST. home_cook.scheduler(name="test-home-cook-at-7AM-IST", schedule="0 7 * * *", time_zone="Asia/Kolkata").schedule(recipe=Recipe(ingredients=["Milk","Cereal"])) ``` ## Concept [`Cloud Tasks`](https://cloud.google.com/tasks) allows us to schedule a HTTP request in the future. [FastAPI](https://fastapi.tiangolo.com/tutorial/body/) makes us define complete schema and params for an HTTP endpoint. [`Cloud Scheduler`](https://cloud.google.com/scheduler) allows us to schedule recurring HTTP requests in the future. FastAPI Cloud Tasks works by putting the three together: - GCP's Cloud Tasks + FastAPI = Partial replacement for celery's async delayed tasks. - GCP's Cloud Scheduler + FastAPI = Replacement for celery beat. - FastAPI Cloud Tasks + Cloud Run = Autoscaled delayed tasks. ## Running ### Local Pre-requisites: - `pip install fastapi-cloud-tasks` - Install [cloud-tasks-emulator](https://github.com/aertje/cloud-tasks-emulator) - Alternatively install ngrok and forward the server's port Start running the emulator in a terminal ```sh cloud-tasks-emulator ``` Start running the task runner on port 8000 so that it is accessible from cloud tasks. ```sh uvicorn examples.simple.main:app --reload --port 8000 ``` In another terminal, trigger the task with curl ``` curl http://localhost:8000/trigger ``` Check the logs on the server, you should see ``` WARNING: Hello task ran with payload: Triggered task ``` Important bits of code: ```python # complete file: examples/simple/main.py # For local, we connect to the emulator client client = None if IS_LOCAL: client = emulator_client() # Construct our DelayedRoute class with all relevant settings # This can be done once across the entire project DelayedRoute = DelayedRouteBuilder( client=client, base_url="http://localhost:8000" queue_path=queue_path( project="gcp-project-id", location="asia-south1", queue="test-queue", ), ) # Override the route_class so that we can add .delay method to the endpoints and know their complete URL delayed_router = APIRouter(route_class=DelayedRoute, prefix="/delayed") class Payload(BaseModel): message: str @delayed_router.post("/hello") async def hello(p: Payload = Payload(message="Default")): logger.warning(f"Hello task ran with payload: {p.message}") # Define our app and add trigger to it. app = FastAPI() @app.get("/trigger") async def trigger(): # Trigger the task hello.delay(p=Payload(message="Triggered task")) return {"message": "Hello task triggered"} app.include_router(delayed_router) ``` Note: You can read complete working source code of the above example in [`examples/simple/main.py`](examples/simple/main.py) In the real world you'd have a separate process for task runner and actual task. ### Deployed environment / Cloud Run Running on Cloud Run with authentication needs us to supply an OIDC token. To do that we can use a `hook`. Pre-requisites: - Create a task queue. Copy the project id, location and queue name. - Deploy the worker as a service on Cloud Run and copy it's URL. - Create a service account in cloud IAM and add `Cloud Run Invoker` role to it. ```python # URL of the Cloud Run service base_url = "https://hello-randomchars-el.a.run.app" DelayedRoute = DelayedRouteBuilder( base_url=base_url, # Task queue, same as above. queue_path=queue_path(...), pre_create_hook=oidc_task_hook( token=tasks_v2.OidcToken( # Service account that you created service_account_email="fastapi-cloud-tasks@gcp-project-id.iam.gserviceaccount.com", audience=base_url, ), ), ) ``` Check the fleshed out example at [`examples/full/tasks.py`](examples/full/tasks.py) If you're not running on CloudRun and want to an OAuth Token instead, you can use the `oauth_task_hook` instead. Check [fastapi_cloud_tasks/hooks.py](fastapi_cloud_tasks/hooks.py) to get the hang od hooks and how you can use them. ## Configuration ### DelayedRouteBuilder Usage: ```python DelayedRoute = DelayedRouteBuilder(...) delayed_router = APIRouter(route_class=DelayedRoute) @delayed_router.get("/simple_task") def simple_task(): return {} ``` - `base_url` - The URL of your worker FastAPI service. - `queue_path` - Full path of the Cloud Tasks queue. (Hint: use the util function `queue_path`) - `task_create_timeout` - How long should we wait before giving up on creating cloud task. - `pre_create_hook` - If you need to edit the `CreateTaskRequest` before sending it to Cloud Tasks (eg: Auth for Cloud Run), you can do that with this hook. See hooks section below for more. - `client` - If you need to override the Cloud Tasks client, pass the client here. (eg: changing credentials, transport etc) #### Task level default options Usage: ```python @delayed_router.get("/simple_task") @task_default_options(...) def simple_task(): return {} ``` All options from above can be passed as `kwargs` to the decorator. Additional options: - `countdown` - Seconds in the future to schedule the task. - `task_id` - named task id for deduplication. (One task id will only be queued once.) Example: ```python # Trigger after 5 minutes @delayed_router.get("/simple_task") @task_default_options(countdown=300) def simple_task(): return {} ``` #### Delayer Options Usage: ```python simple_task.options(...).delay() ``` All options from above can be overriden per call (including DelayedRouteBuilder options like `base_url`) with kwargs to the `options` function before calling delay. Example: ```python # Trigger after 2 minutes simple_task.options(countdown=120).delay() ``` ### ScheduledRouteBuilder Usage: ```python ScheduledRoute = ScheduledRouteBuilder(...) scheduled_router = APIRouter(route_class=ScheduledRoute) @scheduled_router.get("/simple_scheduled_task") def simple_scheduled_task(): return {} simple_scheduled_task.scheduler(name="simple_scheduled_task", schedule="* * * * *").schedule() ``` ## Hooks We might need to override things in the task being sent to Cloud Tasks. The `pre_create_hook` allows us to do that. Some hooks are included in the library. - `oidc_delayed_hook` / `oidc_scheduled_hook` - Used to pass OIDC token (for Cloud Run etc). - `deadline_delayed_hook` / `deadline_scheduled_hook` - Used to change the timeout for the worker of a task. (PS: this deadline is decided by the sender to the queue and not the worker) - `chained_hook` - If you need to chain multiple hooks together, you can do that with `chained_hook(hook1, hook2)` ## Helper dependencies ### max_retries ```python @delayed_router.post("/fail_twice", dependencies=[Depends(max_retries(2))]) async def fail_twice(): raise Exception("nooo") ``` ### CloudTasksHeaders ```python @delayed_router.get("/my_task") async def my_task(ct_headers: CloudTasksHeaders = Depends()): print(ct_headers.queue_name) ``` Check the file [fastapi_cloud_tasks/dependencies.py](fastapi_cloud_tasks/dependencies.py) for details. ## Contributing - Run `pre-commit install` on your local to get pre-commit hook. - Make changes and raise a PR! - If the change is massive, open an issue to discuss it before writing code. Note: This project is neither affiliated with, nor sponsored by Google.


نیازمندی

مقدار نام
- google-cloud-tasks
- google-cloud-scheduler
- fastapi


نحوه نصب


نصب پکیج whl fastapi-cloud-tasks-0.1.1rc1:

    pip install fastapi-cloud-tasks-0.1.1rc1.whl


نصب پکیج tar.gz fastapi-cloud-tasks-0.1.1rc1:

    pip install fastapi-cloud-tasks-0.1.1rc1.tar.gz