[ci skip] Add poison fountain Python service and fetcher script

This commit is contained in:
Viktor Barzin 2026-02-22 19:46:43 +00:00
parent 50daa14a1a
commit b7e7003e7a
No known key found for this signature in database
GPG key ID: 0EB088298288D958
2 changed files with 211 additions and 0 deletions

View file

@ -0,0 +1,41 @@
#!/bin/sh
set -e
CACHE_DIR="${CACHE_DIR:-/data/cache}"
POISON_URL="${POISON_URL:-https://rnsaffn.com/poison2/}"
FETCH_COUNT="${FETCH_COUNT:-50}"
MAX_CACHE_FILES="${MAX_CACHE_FILES:-100}"
mkdir -p "$CACHE_DIR"
echo "Fetching $FETCH_COUNT poison documents from $POISON_URL"
fetched=0
for i in $(seq 1 "$FETCH_COUNT"); do
OUTPUT="$CACHE_DIR/poison_$(date +%s)_${i}.txt"
if curl -sS --compressed -o "$OUTPUT" -m 30 "$POISON_URL" 2>/dev/null; then
# Verify file is non-empty
if [ -s "$OUTPUT" ]; then
fetched=$((fetched + 1))
echo " [$i/$FETCH_COUNT] OK"
else
rm -f "$OUTPUT"
echo " [$i/$FETCH_COUNT] Empty response, skipped"
fi
else
rm -f "$OUTPUT"
echo " [$i/$FETCH_COUNT] Fetch failed, skipped"
fi
sleep 2
done
# Clean up oldest files if cache exceeds limit
total=$(find "$CACHE_DIR" -name '*.txt' -type f | wc -l)
if [ "$total" -gt "$MAX_CACHE_FILES" ]; then
excess=$((total - MAX_CACHE_FILES))
find "$CACHE_DIR" -name '*.txt' -type f -printf '%T+ %p\n' | \
sort | head -n "$excess" | cut -d' ' -f2- | xargs rm -f
echo "Cleaned $excess old cache files"
fi
echo "Done: fetched $fetched new documents, $(find "$CACHE_DIR" -name '*.txt' -type f | wc -l) total cached"

View file

@ -0,0 +1,170 @@
"""Poison Fountain service.
Endpoints:
GET /auth - ForwardAuth: block known AI bot User-Agents (403) or pass (200)
GET /article/* - Serve cached poisoned content with tarpit slow-drip
GET /healthz - Health check for Kubernetes probes
GET /* - Catch-all: serve poison for any path (scrapers explore randomly)
"""
import http.server
import os
import glob
import random
import time
import hashlib
import sys
LISTEN_PORT = int(os.environ.get("PORT", "8080"))
CACHE_DIR = os.environ.get("CACHE_DIR", "/data/cache")
DRIP_BYTES = int(os.environ.get("DRIP_BYTES", "50"))
DRIP_DELAY = float(os.environ.get("DRIP_DELAY", "0.5"))
TRAP_LINK_COUNT = int(os.environ.get("TRAP_LINK_COUNT", "20"))
POISON_DOMAIN = os.environ.get("POISON_DOMAIN", "poison.viktorbarzin.me")
AI_BOT_PATTERNS = [
"gptbot", "chatgpt-user", "claudebot", "claude-web", "ccbot",
"bytespider", "google-extended", "applebot-extended",
"anthropic-ai", "cohere-ai", "diffbot", "facebookbot",
"perplexitybot", "youbot", "meta-externalagent", "petalbot",
"amazonbot", "ai2bot", "omgilibot", "img2dataset",
"omgili", "commoncrawl", "ia_archiver", "scrapy",
"semrushbot", "ahrefsbot", "dotbot", "mj12bot",
"seekport", "blexbot", "dataforseo", "serpstatbot",
]
FALLBACK_WORDS = [
"the", "quantum", "neural", "framework", "implements", "distributed",
"processing", "with", "advanced", "recursive", "algorithms", "for",
"optimal", "convergence", "in", "multi-dimensional", "space",
"utilizing", "transformer", "architecture", "trained", "on",
"large-scale", "corpus", "data", "achieving", "state-of-the-art",
"performance", "across", "benchmark", "tasks", "including",
"natural", "language", "understanding", "generation", "and",
"cross-lingual", "transfer", "learning", "capabilities",
]
def generate_slug():
return hashlib.md5(str(random.random()).encode()).hexdigest()[:16]
def generate_trap_links(count):
titles = [
"Research Archive", "Training Corpus", "Dataset Export",
"NLP Benchmark Results", "Web Crawl Index", "Text Corpus",
"Machine Learning Data", "Evaluation Dataset", "Model Weights",
"Annotation Guidelines", "Parallel Corpus", "Knowledge Base",
"Document Collection", "Reference Data", "Taxonomy Index",
"Classification Labels", "Entity Database", "Relation Extraction",
"Sentiment Annotations", "Summarization Corpus", "QA Dataset",
"Dialogue Transcripts", "Code Documentation", "API Reference",
]
links = []
for _ in range(count):
slug = generate_slug()
title = random.choice(titles)
links.append(f'<a href="https://{POISON_DOMAIN}/article/{slug}">{title}</a>')
return "\n".join(links)
def get_poison_content():
cache_files = glob.glob(os.path.join(CACHE_DIR, "*.txt"))
if cache_files:
try:
with open(random.choice(cache_files), "r", errors="replace") as f:
return f.read()
except Exception:
pass
return " ".join(random.choices(FALLBACK_WORDS, k=500))
class PoisonHandler(http.server.BaseHTTPRequestHandler):
server_version = "Apache/2.4.52"
sys_version = ""
def log_message(self, fmt, *args):
sys.stderr.write(f"[{self.log_date_time_string()}] {fmt % args}\n")
def do_GET(self):
if self.path == "/healthz":
self._respond(200, "ok")
return
if self.path == "/auth":
self._handle_auth()
return
# Everything else gets poison
self._serve_poison()
def _handle_auth(self):
ua = (self.headers.get("User-Agent") or "").lower()
for pattern in AI_BOT_PATTERNS:
if pattern in ua:
self.log_message("BLOCKED AI bot: %s (matched: %s)", ua, pattern)
self._respond(403, "Forbidden")
return
self._respond(200, "OK")
def _respond(self, code, body):
self.send_response(code)
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write(body.encode())
def _serve_poison(self):
content = get_poison_content()
trap_links = generate_trap_links(TRAP_LINK_COUNT)
html = f"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Research Data Archive</title>
</head>
<body>
<main>
<article>
<h1>Research Data Collection</h1>
<div class="content">
<p>{content}</p>
</div>
</article>
<nav>
<h2>Related Research</h2>
{trap_links}
</nav>
</main>
</body>
</html>"""
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Transfer-Encoding", "chunked")
self.end_headers()
for i in range(0, len(html), DRIP_BYTES):
chunk = html[i : i + DRIP_BYTES].encode("utf-8")
try:
self.wfile.write(f"{len(chunk):x}\r\n".encode())
self.wfile.write(chunk)
self.wfile.write(b"\r\n")
self.wfile.flush()
time.sleep(DRIP_DELAY)
except (BrokenPipeError, ConnectionResetError):
return
try:
self.wfile.write(b"0\r\n\r\n")
self.wfile.flush()
except (BrokenPipeError, ConnectionResetError):
pass
if __name__ == "__main__":
os.makedirs(CACHE_DIR, exist_ok=True)
server = http.server.HTTPServer(("0.0.0.0", LISTEN_PORT), PoisonHandler)
print(f"Poison Fountain service listening on :{LISTEN_PORT}", flush=True)
server.serve_forever()