From 7edfc1894d514f217676180cce81256b218d9aff Mon Sep 17 00:00:00 2001 From: Benjamin Liebkemann <82483776+BounceU@users.noreply.github.com> Date: Wed, 10 Dec 2025 09:27:19 -0500 Subject: [PATCH] streaming over webrtc along a strand, with nodes ordering themselves --- public/client.js | 400 +++++++++++++++++++++++++++------------------- public/index.html | 146 ++++------------- server.js | 172 ++++++++++++++++---- 3 files changed, 405 insertions(+), 313 deletions(-) diff --git a/public/client.js b/public/client.js index e81a40b..9e7e817 100644 --- a/public/client.js +++ b/public/client.js @@ -1,193 +1,267 @@ const socket = io(); -let localStream; -let peerConnection; // For Viewers: connection to the streamer -let viewerConnections = {}; // For Streamers: map of { socketId: RTCPeerConnection } -let cloudflareIceServers = []; +const localVideo = document.getElementById('localVideo'); +const remoteVideo = document.getElementById('remoteVideo'); +const statusDiv = document.getElementById('status'); -const videoDisplay = document.getElementById('videoDisplay'); -const placeholder = document.getElementById('videoPlaceholder'); -const streamListDiv = document.getElementById('streamList'); -const goLiveBtn = document.getElementById('goLiveBtn'); -const stopLiveBtn = document.getElementById('stopLiveBtn'); +// --- Global State --- +let localStream = null; +let iceServers = []; -// 1. Initialize on Load (Fetch Keys & Listen for streams) -(async function init() { +// ACTIVE connections +let currentUpstreamPC = null; +let currentDownstreamPC = null; +let downstreamId = null; // Socket ID of who we are sending to + +// FADING connections (Garbage bin for smooth transitions) +let oldUpstreamPCs = []; +let oldDownstreamPCs = []; + +// --- 1. Initialization --- +async function init() { try { - const res = await fetch('/api/get-turn-credentials'); - const data = await res.json(); - cloudflareIceServers = data.iceServers; - console.log("ICE Servers loaded."); + const response = await fetch('/api/get-turn-credentials'); + const data = await response.json(); + iceServers = data.iceServers; + console.log("Loaded ICE Servers"); } catch (e) { - console.error("Failed to load ICE servers:", e); + console.error("Using public STUN"); + iceServers = [{ urls: 'stun:stun.l.google.com:19302' }]; } -})(); +} +init(); -// --- VIEWER LOGIC (Default) --- +// --- 2. User Actions --- +async function startStream() { + try { + localStream = await navigator.mediaDevices.getUserMedia({ video: true, audio: true }); + localVideo.srcObject = localStream; + remoteVideo.style.display = 'none'; // Head doesn't need remote view -// Listen for updates to the list of available streams -socket.on('streamer_list_update', (streamers) => { - streamListDiv.innerHTML = ''; - const ids = Object.keys(streamers); + socket.emit('start_stream'); + statusDiv.innerText = "Status: Broadcasting (Head)"; + } catch (err) { + console.error("Error accessing media:", err); + alert("Could not access camera."); + } +} - if (ids.length === 0) { - streamListDiv.innerHTML = '
No active streams.
'; +function joinStream() { + socket.emit('join_stream'); + localVideo.style.display = 'none'; // Nodes don't see themselves + statusDiv.innerText = "Status: Joining chain..."; +} + +// --- 3. Health Reporting (The "Bubble Sort" Logic) --- +setInterval(calculateAndReportHealth, 5000); + +async function calculateAndReportHealth() { + // If I am Head, I am perfect. + if (localStream) { + socket.emit('update_score', 100); return; } - ids.forEach(id => { - if (id === socket.id) return; // Don't list myself + // If I'm not connected, I'm waiting. + if (!currentUpstreamPC) return; - const div = document.createElement('div'); - div.className = 'stream-item'; - div.innerHTML = ` - ${streamers[id].name} - - `; - streamListDiv.appendChild(div); - }); + try { + const stats = await currentUpstreamPC.getStats(); + let packetsLost = 0; + let packetsReceived = 0; + let jitter = 0; + + stats.forEach(report => { + if (report.type === 'inbound-rtp' && report.kind === 'video') { + packetsLost = report.packetsLost || 0; + packetsReceived = report.packetsReceived || 0; + jitter = report.jitter || 0; + } + }); + + const totalPackets = packetsReceived + packetsLost; + if (totalPackets === 0) return; + + // Calculate Score (0-100) + // Heavy penalty for loss, mild for jitter + const lossRate = packetsLost / totalPackets; + const score = 100 - (lossRate * 100 * 5) - (jitter * 100); + + const finalScore = Math.max(0, Math.min(100, score)); + console.log(`Health: ${finalScore.toFixed(0)} | Loss: ${(lossRate * 100).toFixed(2)}%`); + + socket.emit('update_score', finalScore); + + } catch (e) { + // console.error("Stats error", e); + } +} + +// --- 4. Socket Events --- + +// Instruction: Connect to a new node downstream +socket.on('connect_to_downstream', async ({ downstreamId: targetId }) => { + console.log(`Instruction: Connect downstream to ${targetId}`); + downstreamId = targetId; + await setupDownstreamConnection(targetId); }); -// User clicked "Watch" on a stream -async function watchStream(streamerId) { - if (localStream) { - alert("You cannot watch while streaming!"); - return; +// Instruction: Stop sending downstream (I am now the tail) +socket.on('disconnect_downstream', () => { + console.log("Instruction: Disconnect downstream (Became Tail)"); + if (currentDownstreamPC) { + currentDownstreamPC.close(); + currentDownstreamPC = null; + downstreamId = null; + } + // Also clean up garbage bin + oldDownstreamPCs.forEach(pc => pc.close()); + oldDownstreamPCs = []; +}); + +// Signaling Router +socket.on('signal_msg', async ({ sender, type, sdp, candidate }) => { + // A. OFFERS (Always come from Upstream) + if (type === 'offer') { + await handleUpstreamOffer(sender, sdp); } - // UI Updates - placeholder.classList.add('hidden'); - videoDisplay.classList.remove('hidden'); - videoDisplay.muted = false; // Unmute for viewer - - // Tell server we want to join - console.log("Requesting to join stream:", streamerId); - socket.emit('join_stream', streamerId); - - // Prepare to receive the Offer - socket.off('webrtc_offer'); // Clean previous listeners - socket.on('webrtc_offer', async ({ sdp, sender }) => { - if (sender !== streamerId) return; - - console.log("Received Offer from streamer."); - peerConnection = new RTCPeerConnection({ iceServers: cloudflareIceServers }); - - // When we get tracks (video/audio), show them - peerConnection.ontrack = (event) => { - console.log("Track received"); - videoDisplay.srcObject = event.streams[0]; - }; - - // Handle ICE candidates to send back to streamer - peerConnection.onicecandidate = (event) => { - if (event.candidate) { - socket.emit('ice_candidate', { target: sender, candidate: event.candidate }); - } - }; - - await peerConnection.setRemoteDescription(new RTCSessionDescription(sdp)); - const answer = await peerConnection.createAnswer(); - await peerConnection.setLocalDescription(answer); - - socket.emit('webrtc_answer', { target: sender, sdp: answer }); - }); - - // Handle ICE candidates from streamer - socket.on('ice_candidate', async ({ candidate, sender }) => { - if (peerConnection && sender === streamerId) { - await peerConnection.addIceCandidate(new RTCIceCandidate(candidate)); + // B. ANSWERS (Always come from Downstream) + else if (type === 'answer') { + if (currentDownstreamPC && sender === downstreamId) { + await currentDownstreamPC.setRemoteDescription(new RTCSessionDescription(sdp)); } - }); -} - - -// --- STREAMER LOGIC --- - -async function promptForStream() { - const name = prompt("Enter a name for your stream:", "My Cool Stream"); - if (!name) return; - - startStreaming(name); -} - -async function startStreaming(streamName) { - try { - // 1. Get User Media - localStream = await navigator.mediaDevices.getUserMedia({ video: true, audio: true }); - - // 2. Update UI - videoDisplay.srcObject = localStream; - videoDisplay.classList.remove('hidden'); - placeholder.classList.add('hidden'); - videoDisplay.muted = true; // Mute self to avoid echo - goLiveBtn.classList.add('hidden'); - stopLiveBtn.classList.remove('hidden'); - - // 3. Announce to Server - socket.emit('start_stream', streamName); - console.log("Stream announced:", streamName); - - // 4. Handle Incoming Viewers - socket.on('viewer_joined', async ({ viewerId }) => { - console.log("New viewer joined:", viewerId); - createStreamerConnection(viewerId); - }); - - // 5. Handle Answers from Viewers - socket.on('webrtc_answer', async ({ sdp, sender }) => { - const pc = viewerConnections[sender]; - if (pc) { - await pc.setRemoteDescription(new RTCSessionDescription(sdp)); - } - }); - - // 6. Handle ICE Candidates from Viewers - socket.on('ice_candidate', async ({ candidate, sender }) => { - const pc = viewerConnections[sender]; - if (pc) { - await pc.addIceCandidate(new RTCIceCandidate(candidate)); - } - }); - - // NEW: Handle instant disconnects - socket.on('viewer_left', ({ viewerId }) => { - console.log(`Viewer ${viewerId} left via socket disconnect.`); - const pc = viewerConnections[viewerId]; - if (pc) { - pc.close(); // Stop sending data immediately - delete viewerConnections[viewerId]; - } - }); - - } catch (err) { - console.error("Error starting stream:", err); - alert("Could not access camera/microphone."); } -} -function createStreamerConnection(viewerId) { - const pc = new RTCPeerConnection({ iceServers: cloudflareIceServers }); - viewerConnections[viewerId] = pc; + // C. CANDIDATES (Could be for anyone) + else if (type === 'candidate') { + const ice = new RTCIceCandidate(candidate); - // Add local stream tracks to this connection - localStream.getTracks().forEach(track => pc.addTrack(track, localStream)); + // Try Active Downstream + if (currentDownstreamPC && sender === downstreamId) { + currentDownstreamPC.addIceCandidate(ice).catch(e => { }); + } + // Try Active Upstream + if (currentUpstreamPC) { + currentUpstreamPC.addIceCandidate(ice).catch(e => { }); + } + // Try Fading Connections (Crucial for smooth swap!) + oldUpstreamPCs.forEach(pc => pc.addIceCandidate(ice).catch(e => { })); + oldDownstreamPCs.forEach(pc => pc.addIceCandidate(ice).catch(e => { })); + } +}); - // Send ICE candidates to this specific viewer - pc.onicecandidate = (event) => { +socket.on('error_msg', (msg) => alert(msg)); +socket.on('request_keyframe', () => console.log("Keyframe requested by network")); +socket.on('stream_ended', () => { + alert("Stream ended"); + location.reload(); +}); + +// --- 5. WebRTC Logic (Make-Before-Break) --- + +// --- UPSTREAM Handling (Receiving) --- +async function handleUpstreamOffer(senderId, sdp) { + console.log(`Negotiating new Upstream connection from ${senderId}`); + + const newPC = new RTCPeerConnection({ iceServers }); + + newPC.ontrack = (event) => { + console.log(`New Upstream Track (${event.track.kind}) Active.`); + + // A. Update Screen (Idempotent: doing this twice is fine) + remoteVideo.srcObject = event.streams[0]; + statusDiv.innerText = "Status: Connected (Stable)"; + + // B. Hot-Swap the Relay + if (currentDownstreamPC) { + const sender = currentDownstreamPC.getSenders().find(s => s.track && s.track.kind === event.track.kind); + if (sender) { + sender.replaceTrack(event.track); + } + } + + // C. Retire the OLD connection (THE FIX IS HERE) + // We only queue the old PC if it exists AND it isn't the one we just created. + if (currentUpstreamPC && currentUpstreamPC !== newPC) { + const oldPC = currentUpstreamPC; + oldUpstreamPCs.push(oldPC); + + console.log("Queueing old upstream connection for death in 4s..."); + setTimeout(() => { + oldPC.close(); + // Remove from garbage bin + oldUpstreamPCs = oldUpstreamPCs.filter(pc => pc !== oldPC); + console.log("Closed old upstream connection."); + }, 4000); + } + + // D. Set New as Current + currentUpstreamPC = newPC; + }; + + newPC.onicecandidate = (event) => { if (event.candidate) { - socket.emit('ice_candidate', { target: viewerId, candidate: event.candidate }); + socket.emit('signal_msg', { target: senderId, type: 'candidate', candidate: event.candidate }); } }; - // Create Offer - pc.createOffer().then(offer => { - pc.setLocalDescription(offer); - socket.emit('webrtc_offer', { target: viewerId, sdp: offer }); - }); + await newPC.setRemoteDescription(new RTCSessionDescription(sdp)); + const answer = await newPC.createAnswer(); + await newPC.setLocalDescription(answer); - // Cleanup on disconnect - pc.onconnectionstatechange = () => { - if (pc.connectionState === 'disconnected' || pc.connectionState === 'failed') { - delete viewerConnections[viewerId]; + socket.emit('signal_msg', { target: senderId, type: 'answer', sdp: answer }); +} + +// --- DOWNSTREAM Handling (Sending) --- +async function setupDownstreamConnection(targetId) { + console.log(`Setting up downstream to ${targetId}`); + + // 1. Retire existing downstream (Fade Out) + if (currentDownstreamPC) { + console.log("Moving current downstream to background (Overlap Mode)"); + const oldPC = currentDownstreamPC; + oldDownstreamPCs.push(oldPC); + + // Keep sending for 5 seconds so they have time to connect to their NEW source + setTimeout(() => { + oldPC.close(); + oldDownstreamPCs = oldDownstreamPCs.filter(pc => pc !== oldPC); + console.log("Closed old downstream connection"); + }, 5000); + } + + // 2. Create NEW downstream + currentDownstreamPC = new RTCPeerConnection({ iceServers }); + + // 3. Add Tracks + if (localStream) { + // Head: Send Camera + localStream.getTracks().forEach(track => currentDownstreamPC.addTrack(track, localStream)); + } else if (currentUpstreamPC) { + // Relay: Send what we are receiving + currentUpstreamPC.getReceivers().forEach(receiver => { + if (receiver.track) { + // "remoteVideo.srcObject" ensures stream ID consistency + currentDownstreamPC.addTrack(receiver.track, remoteVideo.srcObject); + } + }); + } + + currentDownstreamPC.onicecandidate = (event) => { + if (event.candidate) { + socket.emit('signal_msg', { target: targetId, type: 'candidate', candidate: event.candidate }); } }; + + const offer = await currentDownstreamPC.createOffer(); + + // 4. BITRATE HACK: Force 4Mbps limit (Standard WebRTC defaults low) + offer.sdp = offer.sdp.replace(/b=AS:([0-9]+)/g, 'b=AS:4000'); + if (!offer.sdp.includes('b=AS:')) { + // If no bandwidth line exists, add it to the video section + offer.sdp = offer.sdp.replace(/(m=video.*\r\n)/, '$1b=AS:4000\r\n'); + } + + await currentDownstreamPC.setLocalDescription(offer); + socket.emit('signal_msg', { target: targetId, type: 'offer', sdp: offer }); } \ No newline at end of file diff --git a/public/index.html b/public/index.html index d5f6e61..961bc94 100644 --- a/public/index.html +++ b/public/index.html @@ -3,138 +3,50 @@ -