crwodsec_web/app/crowdsec_api.py

47 lines
1.4 KiB
Python

import httpx
from typing import List, Dict
import os
API_URL = os.environ["CS_API_URL"] # e.g http://127.0.0.1:8080/v1
API_KEY = os.environ["CS_API_KEY"] # cscli bouncers add <bouncer_name>
MACHINE_ID = os.environ["CS_MACHINE_ID"]
PASSWORD = os.environ["CS_MACHINE_PASSWORD"]
async def list_decisions() -> List[Dict]:
"""
Get all current decisions from CrowdSec API.
"""
async with httpx.AsyncClient() as client:
resp = await client.get(
f"{API_URL}/decisions",
headers={
"X-Api-Key": API_KEY,
"Content-Type": "application/json",
},
)
resp.raise_for_status()
decisions = resp.json() or []
decisions = [d for d in decisions if d["origin"] != "CAPI"]
return decisions
async def delete_decision(ip: str) -> None:
"""
Delete a decision by IP.
"""
async with httpx.AsyncClient() as client:
login = await client.post(
f"{API_URL}/watchers/login",
json={
"machine_id": MACHINE_ID,
"password": PASSWORD,
},
headers={"Content-Type": "application/json"},
)
response = login.json()
token = response["token"]
resp = await client.delete(
f"{API_URL}/decisions?ip={ip}", headers={"Authorization": f"Bearer {token}"}
)
resp.raise_for_status()