read slack notification webhook url from env

This commit is contained in:
Viktor Barzin 2025-07-26 13:15:21 +00:00
parent 4e7734d327
commit 42ed20b833
No known key found for this signature in database
GPG key ID: 4056458DBDBF8863

View file

@ -2,16 +2,17 @@ from abc import abstractmethod
from enum import StrEnum from enum import StrEnum
import apprise import apprise
from functools import lru_cache from functools import lru_cache
import os
class Surface: class Surface:
@abstractmethod @abstractmethod
def connection_string(self) -> str | None: def connection_string(self) -> str | None: ...
...
class Slack(Surface): class Slack(Surface):
def connection_string(self) -> str | None: def connection_string(self) -> str | None:
return "https://hooks.slack.com/services/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" return os.environ.get("SLACK_WEBHOOK_URL")
@lru_cache(maxsize=None) @lru_cache(maxsize=None)
@ -19,9 +20,11 @@ def get_notifier(surfaces: list[Surface] | None = None) -> apprise.Apprise:
surfaces = surfaces or [Slack()] surfaces = surfaces or [Slack()]
obj = apprise.Apprise() obj = apprise.Apprise()
for surface in surfaces: for surface in surfaces:
obj.add(surface.connection_string()) if conn := surface.connection_string():
obj.add(conn)
return obj return obj
async def send_notification( body: str, title: str='') -> bool:
async def send_notification(body: str, title: str = "") -> bool:
notifier = get_notifier() notifier = get_notifier()
return await notifier.async_notify(body=body, title=title) return await notifier.async_notify(body=body, title=title)