[ci skip] Move Terraform modules into stack directories

Move all 88 service modules (66 individual + 22 platform) from
modules/kubernetes/<service>/ into their corresponding stack directories:

- Service stacks: stacks/<service>/module/
- Platform stack: stacks/platform/modules/<service>/

This collocates module source code with its Terragrunt definition.
Only shared utility modules remain in modules/kubernetes/:
ingress_factory, setup_tls_secret, dockerhub_secret, oauth-proxy.

All cross-references to shared modules updated to use correct
relative paths. Verified with terragrunt run --all -- plan:
0 adds, 0 destroys across all 68 stacks.
This commit is contained in:
Viktor Barzin 2026-02-22 14:38:14 +00:00
parent 73cb696f12
commit e225e81ebf
No known key found for this signature in database
GPG key ID: 0EB088298288D958
614 changed files with 12075 additions and 352 deletions

View file

@ -0,0 +1,38 @@
package extractor
import (
"log"
"os/exec"
)
const maxConcurrentSessions = 10
var sessionSem chan struct{}
// Init starts dbus, PulseAudio, and prepares the session semaphore.
func Init() {
// Start dbus (Chrome needs it for accessibility/service queries)
if err := exec.Command("mkdir", "-p", "/var/run/dbus").Run(); err == nil {
if err := exec.Command("dbus-daemon", "--system", "--nofork").Start(); err != nil {
log.Printf("extractor: warning: failed to start dbus: %v", err)
}
}
if err := exec.Command("pulseaudio", "--start", "--exit-idle-time=-1").Run(); err != nil {
log.Printf("extractor: warning: failed to start PulseAudio: %v", err)
}
// Create a null-sink as the default audio target for all sessions
exec.Command("pactl", "load-module", "module-null-sink",
"sink_name=virtual_sink",
"sink_properties=device.description=VirtualSink").Run()
exec.Command("pactl", "set-default-sink", "virtual_sink").Run()
sessionSem = make(chan struct{}, maxConcurrentSessions)
log.Println("extractor: initialized")
}
// Stop kills PulseAudio.
func Stop() {
exec.Command("pulseaudio", "--kill").Run()
log.Println("extractor: stopped")
}

View file

@ -0,0 +1,167 @@
package extractor
import (
"fmt"
"log"
"os"
"os/exec"
"sync/atomic"
"time"
)
var displayCounter int64 = 99
func nextDisplay() int {
return int(atomic.AddInt64(&displayCounter, 1))
}
// Capture manages an Xvfb display and separate ffmpeg pipelines for video and audio.
// Audio capture is best-effort — if PulseAudio is unavailable, video still works.
type Capture struct {
display int
xvfbCmd *exec.Cmd
videoCmd *exec.Cmd
audioCmd *exec.Cmd
videoR *os.File // IVF pipe reader (VP8 frames)
audioR *os.File // OGG pipe reader (Opus frames)
}
// NewCapture starts Xvfb on the given display and two ffmpeg processes:
// one for video (x11grab → VP8/IVF) and one for audio (pulse → Opus/OGG).
// Audio is best-effort — if it fails to start, video still works and audioR
// is set to a pipe that will return EOF immediately.
func NewCapture(display, width, height int) (*Capture, error) {
c := &Capture{display: display}
// Start Xvfb
screen := fmt.Sprintf("%dx%dx24", width, height)
c.xvfbCmd = exec.Command("Xvfb", fmt.Sprintf(":%d", display),
"-screen", "0", screen, "-ac", "-nolisten", "tcp")
if err := c.xvfbCmd.Start(); err != nil {
return nil, fmt.Errorf("capture: failed to start Xvfb: %w", err)
}
// Wait for Xvfb to be ready (X11 socket must exist)
ready := false
for i := 0; i < 50; i++ {
socketPath := fmt.Sprintf("/tmp/.X11-unix/X%d", display)
if _, err := os.Stat(socketPath); err == nil {
ready = true
break
}
time.Sleep(100 * time.Millisecond)
}
if !ready {
c.xvfbCmd.Process.Kill()
c.xvfbCmd.Wait()
return nil, fmt.Errorf("capture: Xvfb did not start in time for display :%d", display)
}
// --- Video pipeline (required) ---
videoR, videoW, err := os.Pipe()
if err != nil {
c.cleanup()
return nil, fmt.Errorf("capture: video pipe: %w", err)
}
c.videoCmd = exec.Command("ffmpeg",
"-loglevel", "warning",
"-f", "x11grab", "-framerate", "30",
"-video_size", fmt.Sprintf("%dx%d", width, height),
"-i", fmt.Sprintf(":%d", display),
"-c:v", "libvpx",
"-quality", "realtime", "-cpu-used", "8",
"-deadline", "realtime", "-b:v", "2M", "-g", "30",
"-f", "ivf", "pipe:3",
)
c.videoCmd.ExtraFiles = []*os.File{videoW}
c.videoCmd.Stdout = os.Stderr
c.videoCmd.Stderr = os.Stderr
if err := c.videoCmd.Start(); err != nil {
videoR.Close()
videoW.Close()
c.cleanup()
return nil, fmt.Errorf("capture: failed to start video ffmpeg: %w", err)
}
videoW.Close()
c.videoR = videoR
go func() {
if err := c.videoCmd.Wait(); err != nil {
log.Printf("capture: video ffmpeg exited on display :%d: %v", display, err)
}
}()
// --- Audio pipeline (best-effort) ---
audioR, audioW, err := os.Pipe()
if err != nil {
log.Printf("capture: audio pipe failed on display :%d: %v (continuing without audio)", display, err)
// Provide a closed pipe so StreamAudio gets EOF immediately
r, w, _ := os.Pipe()
w.Close()
c.audioR = r
log.Printf("capture: started display :%d (%dx%d) (video only)", display, width, height)
return c, nil
}
c.audioCmd = exec.Command("ffmpeg",
"-loglevel", "warning",
"-f", "pulse", "-i", "virtual_sink.monitor",
"-c:a", "libopus",
"-b:a", "128k", "-application", "lowdelay",
"-f", "ogg", "pipe:3",
)
c.audioCmd.ExtraFiles = []*os.File{audioW}
c.audioCmd.Stdout = os.Stderr
c.audioCmd.Stderr = os.Stderr
if err := c.audioCmd.Start(); err != nil {
log.Printf("capture: audio ffmpeg failed to start on display :%d: %v (continuing without audio)", display, err)
audioR.Close()
audioW.Close()
// Provide a closed pipe so StreamAudio gets EOF immediately
r, w, _ := os.Pipe()
w.Close()
c.audioR = r
c.audioCmd = nil
log.Printf("capture: started display :%d (%dx%d) (video only)", display, width, height)
return c, nil
}
audioW.Close()
c.audioR = audioR
go func() {
if err := c.audioCmd.Wait(); err != nil {
log.Printf("capture: audio ffmpeg exited on display :%d: %v", display, err)
}
}()
log.Printf("capture: started display :%d (%dx%d) (video + audio)", display, width, height)
return c, nil
}
func (c *Capture) cleanup() {
if c.xvfbCmd != nil && c.xvfbCmd.Process != nil {
c.xvfbCmd.Process.Kill()
c.xvfbCmd.Wait()
}
}
// Close stops ffmpeg processes, Xvfb, and releases pipe resources.
func (c *Capture) Close() {
if c.videoCmd != nil && c.videoCmd.Process != nil {
c.videoCmd.Process.Kill()
}
if c.audioCmd != nil && c.audioCmd.Process != nil {
c.audioCmd.Process.Kill()
}
if c.videoR != nil {
c.videoR.Close()
}
if c.audioR != nil {
c.audioR.Close()
}
c.cleanup()
log.Printf("capture: stopped display :%d", c.display)
}

View file

@ -0,0 +1,383 @@
package extractor
import (
"context"
"encoding/json"
"fmt"
"log"
"net"
"net/http"
"os"
"os/exec"
"sync"
"time"
"github.com/chromedp/cdproto/fetch"
"github.com/chromedp/cdproto/input"
"github.com/chromedp/cdproto/network"
"github.com/chromedp/cdproto/page"
"github.com/chromedp/chromedp"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"github.com/pion/webrtc/v4"
)
const (
sessionTimeout = 5 * time.Minute
defaultViewportW = 1280
defaultViewportH = 720
turnCredentialTTL = 24 * time.Hour
)
var (
turnURL string
turnSharedSecret string
turnInternalURL string
)
// SetTURNConfig sets the TURN server URL, shared secret, and optional internal URL.
// The internal URL is used by pion (server-side) to avoid hairpin NAT issues.
// The public URL is sent to the browser client.
func SetTURNConfig(url, secret, internalURL string) {
turnURL = url
turnSharedSecret = secret
turnInternalURL = internalURL
if turnInternalURL == "" {
turnInternalURL = "turn:coturn.coturn.svc.cluster.local:3478"
}
log.Printf("extractor: TURN configured: public=%s internal=%s", url, turnInternalURL)
}
var adDomains = []string{
"doubleclick.net", "googlesyndication.com", "googleadservices.com",
"google-analytics.com", "adnxs.com", "criteo.com", "outbrain.com",
"taboola.com", "amazon-adsystem.com", "popads.net", "popcash.net",
"juicyads.com", "exoclick.com", "trafficjunky.com", "propellerads.com",
"adsterra.com", "hilltopads.net", "revcontent.com", "mgid.com",
}
type inputMsg struct {
Type string `json:"type"`
X float64 `json:"x"`
Y float64 `json:"y"`
Button int `json:"button"`
DeltaX float64 `json:"deltaX"`
DeltaY float64 `json:"deltaY"`
Key string `json:"key"`
Code string `json:"code"`
Mods int `json:"modifiers"`
Width int `json:"width"`
Height int `json:"height"`
SDP string `json:"sdp"`
Candidate *webrtc.ICECandidateInit `json:"candidate"`
}
// HandleBrowserSession upgrades to WebSocket and runs a remote browser session
// with WebRTC video/audio streaming and CDP input relay.
func HandleBrowserSession(w http.ResponseWriter, r *http.Request, pageURL string) {
// Check session capacity
select {
case sessionSem <- struct{}{}:
defer func() { <-sessionSem }()
default:
http.Error(w, `{"error":"too many active browser sessions"}`, http.StatusServiceUnavailable)
return
}
conn, _, _, err := ws.UpgradeHTTP(r, w)
if err != nil {
log.Printf("extractor: session: ws upgrade failed: %v", err)
return
}
defer conn.Close()
ctx, cancel := context.WithCancel(r.Context())
defer cancel()
// Allocate display and start capture pipeline
display := nextDisplay()
viewW, viewH := defaultViewportW, defaultViewportH
cap, err := NewCapture(display, viewW, viewH)
if err != nil {
sendWSError(conn, "failed to start capture: "+err.Error())
log.Printf("extractor: session: capture error: %v", err)
return
}
defer cap.Close()
// Start Chrome on the virtual display
opts := append(chromedp.DefaultExecAllocatorOptions[:],
chromedp.Flag("headless", false),
chromedp.Flag("no-sandbox", true),
chromedp.Flag("disable-gpu", true),
chromedp.Flag("disable-software-rasterizer", true),
chromedp.Flag("disable-dev-shm-usage", true),
chromedp.Flag("disable-extensions", true),
chromedp.Flag("disable-background-networking", true),
chromedp.ModifyCmdFunc(func(cmd *exec.Cmd) {
cmd.Env = append(os.Environ(), fmt.Sprintf("DISPLAY=:%d", display))
}),
chromedp.Flag("autoplay-policy", "no-user-gesture-required"),
chromedp.Flag("window-size", fmt.Sprintf("%d,%d", viewW, viewH)),
chromedp.WSURLReadTimeout(30 * time.Second),
)
allocCtx, allocCancel := chromedp.NewExecAllocator(ctx, opts...)
defer allocCancel()
tabCtx, tabCancel := chromedp.NewContext(allocCtx)
defer tabCancel()
var wsMu sync.Mutex
// Build ICE servers for pion (server-side) — uses internal TURN URL to avoid hairpin NAT
iceServers := []webrtc.ICEServer{
{URLs: []string{"stun:stun.l.google.com:19302"}},
}
var turnCreds *TURNCredentials
if turnURL != "" && turnSharedSecret != "" {
// Server-side: use internal k8s DNS for TURN to bypass NAT
internalCreds := GenerateTURNCredentials(turnInternalURL, turnSharedSecret, turnCredentialTTL)
turnCreds = &internalCreds
iceServers = append(iceServers, webrtc.ICEServer{
URLs: internalCreds.URLs,
Username: internalCreds.Username,
Credential: internalCreds.Credential,
CredentialType: webrtc.ICECredentialTypePassword,
})
}
// Build ad-blocking fetch patterns
adPatterns := make([]*fetch.RequestPattern, 0, len(adDomains))
for _, domain := range adDomains {
adPatterns = append(adPatterns, &fetch.RequestPattern{
URLPattern: fmt.Sprintf("*://*.%s/*", domain),
})
}
// Set up event listeners before navigation
chromedp.ListenTarget(tabCtx, func(ev interface{}) {
switch e := ev.(type) {
case *fetch.EventRequestPaused:
go chromedp.Run(tabCtx, fetch.FailRequest(e.RequestID, network.ErrorReasonBlockedByClient))
case *page.EventFrameNavigated:
if e.Frame.ParentID == "" {
go sendURLUpdate(tabCtx, conn, &wsMu, e.Frame.URL)
}
case *page.EventNavigatedWithinDocument:
go sendURLUpdate(tabCtx, conn, &wsMu, e.URL)
}
})
// Enable fetch interception (ad blocking) and navigate
if err := chromedp.Run(tabCtx,
fetch.Enable().WithPatterns(adPatterns),
chromedp.Navigate(pageURL),
chromedp.WaitReady("body"),
); err != nil {
sendWSError(conn, "navigation failed")
log.Printf("extractor: session: navigate error for %s: %v", pageURL, err)
return
}
// Create WebRTC media stream
mediaStream, err := NewMediaStream(iceServers, func(c *webrtc.ICECandidate) {
data, _ := json.Marshal(map[string]interface{}{
"type": "ice",
"candidate": c.ToJSON(),
})
wsMu.Lock()
wsutil.WriteServerMessage(conn, ws.OpText, data)
wsMu.Unlock()
}, cancel)
if err != nil {
sendWSError(conn, "WebRTC setup failed")
log.Printf("extractor: session: webrtc error: %v", err)
return
}
defer mediaStream.Close()
// Create and send SDP offer
sdp, err := mediaStream.Offer()
if err != nil {
sendWSError(conn, "WebRTC offer failed")
log.Printf("extractor: session: offer error: %v", err)
return
}
// Send ICE config to client — uses PUBLIC TURN URL (for browser to reach from internet)
clientICE := []map[string]interface{}{
{"urls": []string{"stun:stun.l.google.com:19302"}},
}
if turnCreds != nil {
// Client-side: use public IP for TURN (browser connects from internet)
publicCreds := GenerateTURNCredentials(turnURL, turnSharedSecret, turnCredentialTTL)
clientICE = append(clientICE, map[string]interface{}{
"urls": publicCreds.URLs,
"username": publicCreds.Username,
"credential": publicCreds.Credential,
})
}
iceMsg, _ := json.Marshal(map[string]interface{}{
"type": "iceServers",
"iceServers": clientICE,
})
wsMu.Lock()
wsutil.WriteServerMessage(conn, ws.OpText, iceMsg)
wsMu.Unlock()
offerMsg, _ := json.Marshal(map[string]interface{}{
"type": "offer",
"sdp": sdp,
})
wsMu.Lock()
wsutil.WriteServerMessage(conn, ws.OpText, offerMsg)
wsMu.Unlock()
// Send ready message with viewport dimensions
readyMsg, _ := json.Marshal(map[string]interface{}{
"type": "ready",
"width": viewW,
"height": viewH,
})
wsMu.Lock()
wsutil.WriteServerMessage(conn, ws.OpText, readyMsg)
wsMu.Unlock()
// Start streaming video and audio from capture pipes
go mediaStream.StreamVideo(cap.videoR, ctx)
go mediaStream.StreamAudio(cap.audioR, ctx)
log.Printf("extractor: session: started for %s (display :%d)", pageURL, display)
// Inactivity timer — cancels session after no client input
inactivity := time.NewTimer(sessionTimeout)
defer inactivity.Stop()
go func() {
select {
case <-inactivity.C:
log.Printf("extractor: session: inactivity timeout for %s", pageURL)
cancel()
case <-ctx.Done():
}
}()
// Read loop — process signaling and input messages
for {
msgs, err := wsutil.ReadClientMessage(conn, nil)
if err != nil {
break
}
for _, m := range msgs {
if m.OpCode != ws.OpText {
continue
}
// Reset inactivity timer
if !inactivity.Stop() {
select {
case <-inactivity.C:
default:
}
}
inactivity.Reset(sessionTimeout)
var msg inputMsg
if err := json.Unmarshal(m.Payload, &msg); err != nil {
continue
}
switch msg.Type {
case "answer":
if err := mediaStream.SetAnswer(msg.SDP); err != nil {
log.Printf("extractor: session: set answer error: %v", err)
}
case "ice":
if msg.Candidate != nil {
if err := mediaStream.AddICECandidate(*msg.Candidate); err != nil {
log.Printf("extractor: session: add ICE error: %v", err)
}
}
case "back":
chromedp.Run(tabCtx, chromedp.NavigateBack())
case "forward":
chromedp.Run(tabCtx, chromedp.NavigateForward())
default:
handleInput(tabCtx, &msg)
}
}
}
log.Printf("extractor: session: ended for %s", pageURL)
}
func handleInput(ctx context.Context, msg *inputMsg) {
switch msg.Type {
case "mousemove":
chromedp.Run(ctx,
input.DispatchMouseEvent(input.MouseMoved, msg.X, msg.Y))
case "mousedown":
chromedp.Run(ctx,
input.DispatchMouseEvent(input.MousePressed, msg.X, msg.Y).
WithButton(mapButton(msg.Button)).WithClickCount(1))
case "mouseup":
chromedp.Run(ctx,
input.DispatchMouseEvent(input.MouseReleased, msg.X, msg.Y).
WithButton(mapButton(msg.Button)))
case "scroll":
chromedp.Run(ctx,
input.DispatchMouseEvent(input.MouseWheel, msg.X, msg.Y).
WithDeltaX(msg.DeltaX).WithDeltaY(msg.DeltaY))
case "keydown":
chromedp.Run(ctx,
input.DispatchKeyEvent(input.KeyDown).
WithKey(msg.Key).WithCode(msg.Code).
WithModifiers(input.Modifier(msg.Mods)))
case "keyup":
chromedp.Run(ctx,
input.DispatchKeyEvent(input.KeyUp).
WithKey(msg.Key).WithCode(msg.Code).
WithModifiers(input.Modifier(msg.Mods)))
}
}
func mapButton(jsButton int) input.MouseButton {
switch jsButton {
case 1:
return input.Middle
case 2:
return input.Right
default:
return input.Left
}
}
func sendURLUpdate(tabCtx context.Context, conn net.Conn, mu *sync.Mutex, currentURL string) {
var canBack, canForward bool
var entries []*page.NavigationEntry
var currentIndex int64
if err := chromedp.Run(tabCtx, chromedp.ActionFunc(func(ctx context.Context) error {
var err error
currentIndex, entries, err = page.GetNavigationHistory().Do(ctx)
return err
})); err == nil {
canBack = currentIndex > 0
canForward = int(currentIndex) < len(entries)-1
}
data, _ := json.Marshal(map[string]interface{}{
"type": "url",
"url": currentURL,
"canBack": canBack,
"canForward": canForward,
})
mu.Lock()
wsutil.WriteServerMessage(conn, ws.OpText, data)
mu.Unlock()
}
func sendWSError(conn net.Conn, msg string) {
data, _ := json.Marshal(map[string]string{"type": "error", "message": msg})
wsutil.WriteServerMessage(conn, ws.OpText, data)
}

View file

@ -0,0 +1,248 @@
package extractor
import (
"context"
"crypto/hmac"
"crypto/sha1"
"encoding/base64"
"fmt"
"io"
"log"
"time"
"github.com/pion/webrtc/v4"
"github.com/pion/webrtc/v4/pkg/media"
"github.com/pion/webrtc/v4/pkg/media/ivfreader"
"github.com/pion/webrtc/v4/pkg/media/oggreader"
)
// TURNCredentials holds ephemeral TURN credentials generated from a shared secret.
type TURNCredentials struct {
URLs []string `json:"urls"`
Username string `json:"username"`
Credential string `json:"credential"`
}
// GenerateTURNCredentials creates time-limited TURN credentials using the
// shared secret (TURN REST API / coturn --use-auth-secret).
func GenerateTURNCredentials(turnURL, sharedSecret string, ttl time.Duration) TURNCredentials {
expiry := time.Now().Add(ttl).Unix()
username := fmt.Sprintf("%d", expiry)
mac := hmac.New(sha1.New, []byte(sharedSecret))
mac.Write([]byte(username))
credential := base64.StdEncoding.EncodeToString(mac.Sum(nil))
return TURNCredentials{
URLs: []string{turnURL},
Username: username,
Credential: credential,
}
}
// MediaStream wraps a pion WebRTC PeerConnection with VP8 video and Opus audio tracks.
type MediaStream struct {
pc *webrtc.PeerConnection
videoTrack *webrtc.TrackLocalStaticSample
audioTrack *webrtc.TrackLocalStaticSample
}
// NewMediaStream creates a PeerConnection with VP8 + Opus tracks and an ICE callback.
// The cancel function is called when ICE fails to trigger session cleanup.
func NewMediaStream(iceServers []webrtc.ICEServer, onICE func(*webrtc.ICECandidate), cancel context.CancelFunc) (*MediaStream, error) {
config := webrtc.Configuration{
ICEServers: iceServers,
}
pc, err := webrtc.NewPeerConnection(config)
if err != nil {
return nil, err
}
videoTrack, err := webrtc.NewTrackLocalStaticSample(
webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8},
"video", "stream",
)
if err != nil {
pc.Close()
return nil, err
}
audioTrack, err := webrtc.NewTrackLocalStaticSample(
webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus},
"audio", "stream",
)
if err != nil {
pc.Close()
return nil, err
}
if _, err = pc.AddTrack(videoTrack); err != nil {
pc.Close()
return nil, err
}
if _, err = pc.AddTrack(audioTrack); err != nil {
pc.Close()
return nil, err
}
pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
log.Printf("webrtc: ICE connection state: %s", state.String())
if state == webrtc.ICEConnectionStateFailed {
log.Printf("webrtc: ICE failed, cancelling session")
cancel()
return
}
if state == webrtc.ICEConnectionStateConnected {
// Log selected candidate pair
if stats := pc.GetStats(); stats != nil {
for _, s := range stats {
if cp, ok := s.(webrtc.ICECandidatePairStats); ok && cp.Nominated {
log.Printf("webrtc: selected candidate pair: local=%s remote=%s",
cp.LocalCandidateID, cp.RemoteCandidateID)
}
}
}
// Start periodic stats logging
go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for range ticker.C {
if pc.ICEConnectionState() != webrtc.ICEConnectionStateConnected &&
pc.ICEConnectionState() != webrtc.ICEConnectionStateCompleted {
return
}
stats := pc.GetStats()
for _, s := range stats {
if out, ok := s.(webrtc.OutboundRTPStreamStats); ok {
log.Printf("webrtc: outbound-rtp kind=%s bytes=%d packets=%d",
out.Kind, out.BytesSent, out.PacketsSent)
}
}
}
}()
}
})
pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
log.Printf("webrtc: peer connection state: %s", state.String())
})
pc.OnICECandidate(func(c *webrtc.ICECandidate) {
if c != nil {
log.Printf("webrtc: gathered ICE candidate: type=%s addr=%s:%d",
c.Typ.String(), c.Address, c.Port)
if onICE != nil {
onICE(c)
}
}
})
return &MediaStream{
pc: pc,
videoTrack: videoTrack,
audioTrack: audioTrack,
}, nil
}
// Offer creates an SDP offer, sets it as local description, and returns the SDP string.
func (m *MediaStream) Offer() (string, error) {
offer, err := m.pc.CreateOffer(nil)
if err != nil {
return "", err
}
if err := m.pc.SetLocalDescription(offer); err != nil {
return "", err
}
return offer.SDP, nil
}
// SetAnswer sets the remote SDP answer.
func (m *MediaStream) SetAnswer(sdp string) error {
return m.pc.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer,
SDP: sdp,
})
}
// AddICECandidate adds a remote ICE candidate.
func (m *MediaStream) AddICECandidate(init webrtc.ICECandidateInit) error {
return m.pc.AddICECandidate(init)
}
// StreamVideo reads VP8 frames from an IVF stream and writes them to the video track.
// Blocks until the reader returns an error or the context is cancelled.
func (m *MediaStream) StreamVideo(r io.Reader, ctx context.Context) {
ivf, _, err := ivfreader.NewWith(r)
if err != nil {
log.Printf("webrtc: ivf reader error: %v", err)
return
}
duration := time.Second / 30
for {
select {
case <-ctx.Done():
return
default:
}
frame, _, err := ivf.ParseNextFrame()
if err != nil {
if err != io.EOF {
log.Printf("webrtc: video frame error: %v", err)
}
return
}
if err := m.videoTrack.WriteSample(media.Sample{
Data: frame,
Duration: duration,
}); err != nil {
log.Printf("webrtc: video write error: %v", err)
return
}
}
}
// StreamAudio reads Opus pages from an OGG stream and writes them to the audio track.
// Blocks until the reader returns an error or the context is cancelled.
func (m *MediaStream) StreamAudio(r io.Reader, ctx context.Context) {
ogg, _, err := oggreader.NewWith(r)
if err != nil {
log.Printf("webrtc: ogg reader error: %v", err)
return
}
for {
select {
case <-ctx.Done():
return
default:
}
page, _, err := ogg.ParseNextPage()
if err != nil {
if err != io.EOF {
log.Printf("webrtc: audio page error: %v", err)
}
return
}
if err := m.audioTrack.WriteSample(media.Sample{
Data: page,
Duration: 20 * time.Millisecond,
}); err != nil {
log.Printf("webrtc: audio write error: %v", err)
return
}
}
}
// Close closes the underlying PeerConnection.
func (m *MediaStream) Close() {
if m.pc != nil {
m.pc.Close()
}
}