From 421b63304545cd4e3e0dec7b1c3960fbe91bff8b Mon Sep 17 00:00:00 2001 From: Benjamin Liebkemann <82483776+BounceU@users.noreply.github.com> Date: Wed, 10 Dec 2025 10:42:29 -0500 Subject: [PATCH] Streams done, Monitoring done, unified UI --- public/client.js | 241 ++++++++++++++++++++++------------------- public/index.html | 256 ++++++++++++++++++++++++++++++++++++++++---- public/monitor.html | 214 ++++++++++++++++++++++++++++++++++++ public/monitor.js | 141 ++++++++++++++++++++++++ server.js | 230 +++++++++++++++++++++++---------------- 5 files changed, 856 insertions(+), 226 deletions(-) create mode 100644 public/monitor.html create mode 100644 public/monitor.js diff --git a/public/client.js b/public/client.js index 9e7e817..434e016 100644 --- a/public/client.js +++ b/public/client.js @@ -2,6 +2,10 @@ const socket = io(); const localVideo = document.getElementById('localVideo'); const remoteVideo = document.getElementById('remoteVideo'); const statusDiv = document.getElementById('status'); +const streamListUl = document.getElementById('streamList'); +const lobbyView = document.getElementById('lobby-view'); +const streamView = document.getElementById('stream-view'); +const streamTitle = document.getElementById('currentStreamTitle'); // --- Global State --- let localStream = null; @@ -10,9 +14,9 @@ let iceServers = []; // ACTIVE connections let currentUpstreamPC = null; let currentDownstreamPC = null; -let downstreamId = null; // Socket ID of who we are sending to +let downstreamId = null; -// FADING connections (Garbage bin for smooth transitions) +// FADING connections let oldUpstreamPCs = []; let oldDownstreamPCs = []; @@ -22,46 +26,116 @@ async function init() { const response = await fetch('/api/get-turn-credentials'); const data = await response.json(); iceServers = data.iceServers; - console.log("Loaded ICE Servers"); } catch (e) { - console.error("Using public STUN"); iceServers = [{ urls: 'stun:stun.l.google.com:19302' }]; } } init(); -// --- 2. User Actions --- -async function startStream() { - try { - localStream = await navigator.mediaDevices.getUserMedia({ video: true, audio: true }); - localVideo.srcObject = localStream; - remoteVideo.style.display = 'none'; // Head doesn't need remote view +// --- 2. Lobby Logic --- + +// Receive list of streams from server +socket.on('stream_list_update', (streams) => { + streamListUl.innerHTML = ""; + if (streams.length === 0) { + streamListUl.innerHTML = "
  • No active streams. Start one!
  • "; + return; + } + + streams.forEach(name => { + const li = document.createElement('li'); + li.innerText = `${name}`; + li.onclick = () => joinStream(name); + streamListUl.appendChild(li); + }); +}); + +async function startStream() { + const name = document.getElementById('streamNameInput').value; + const fileInput = document.getElementById('videoFileInput'); + const file = fileInput.files[0]; + + if (!name) return alert("Please enter a name"); + + try { + if (file) { + // --- OPTION A: VIDEO FILE MODE --- + console.log("Starting stream from video file..."); + + // 1. Create a URL for the local file + const fileURL = URL.createObjectURL(file); + + // 2. Set it to the local video element + localVideo.src = fileURL; + localVideo.loop = true; // <--- HERE IS THE LOOP LOGIC + localVideo.muted = false; // Must be unmuted to capture audio, use headphones! + localVideo.volume = 1.0; + + // 3. Play the video (required before capturing) + await localVideo.play(); + + // 4. Capture the stream from the video element + // (Chrome/Edge use captureStream, Firefox uses mozCaptureStream) + if (localVideo.captureStream) { + localStream = localVideo.captureStream(); + } else if (localVideo.mozCaptureStream) { + localStream = localVideo.mozCaptureStream(); + } else { + return alert("Your browser does not support capturing video from files."); + } + + // Note: captureStream() sometimes doesn't capture audio if the element is muted. + // If you want to stream audio, you must hear it locally too. + + } else { + // --- OPTION B: WEBCAM MODE --- + console.log("Starting stream from Webcam..."); + localStream = await navigator.mediaDevices.getUserMedia({ video: true, audio: true }); + localVideo.srcObject = localStream; + localVideo.muted = true; // Mute local webcam to avoid feedback loop + } + + // --- COMMON LOGIC --- + + // UI Switch + lobbyView.style.display = 'none'; + streamView.style.display = 'block'; + remoteVideo.style.display = 'none'; // Broadcaster doesn't see remote + streamTitle.innerText = `Broadcasting: ${name}`; + + socket.emit('start_stream', name); + statusDiv.innerText = `Status: Broadcasting (Head) | ID: ${socket.id}`; - socket.emit('start_stream'); - statusDiv.innerText = "Status: Broadcasting (Head)"; } catch (err) { - console.error("Error accessing media:", err); - alert("Could not access camera."); + console.error(err); + alert("Failed to start stream: " + err.message); } } -function joinStream() { - socket.emit('join_stream'); - localVideo.style.display = 'none'; // Nodes don't see themselves - statusDiv.innerText = "Status: Joining chain..."; +function joinStream(name) { + // UI Switch + lobbyView.style.display = 'none'; + streamView.style.display = 'block'; + localVideo.style.display = 'none'; // Viewers don't see themselves + streamTitle.innerText = `Watching: ${name}`; + + socket.emit('join_stream', name); + statusDiv.innerText = `Status: Joining chain... | ID: ${socket.id}`; } -// --- 3. Health Reporting (The "Bubble Sort" Logic) --- +function leaveStream() { + location.reload(); // Simple way to reset state and go back to lobby +} + + +// --- 3. Health Reporting (Unchanged) --- setInterval(calculateAndReportHealth, 5000); async function calculateAndReportHealth() { - // If I am Head, I am perfect. if (localStream) { socket.emit('update_score', 100); return; } - - // If I'm not connected, I'm waiting. if (!currentUpstreamPC) return; try { @@ -81,186 +155,135 @@ async function calculateAndReportHealth() { const totalPackets = packetsReceived + packetsLost; if (totalPackets === 0) return; - // Calculate Score (0-100) - // Heavy penalty for loss, mild for jitter const lossRate = packetsLost / totalPackets; const score = 100 - (lossRate * 100 * 5) - (jitter * 100); - const finalScore = Math.max(0, Math.min(100, score)); console.log(`Health: ${finalScore.toFixed(0)} | Loss: ${(lossRate * 100).toFixed(2)}%`); socket.emit('update_score', finalScore); - - } catch (e) { - // console.error("Stats error", e); - } + } catch (e) { } } -// --- 4. Socket Events --- +// --- 4. Socket Events (Logic Updated for Smart Disconnect) --- -// Instruction: Connect to a new node downstream socket.on('connect_to_downstream', async ({ downstreamId: targetId }) => { - console.log(`Instruction: Connect downstream to ${targetId}`); downstreamId = targetId; await setupDownstreamConnection(targetId); }); -// Instruction: Stop sending downstream (I am now the tail) socket.on('disconnect_downstream', () => { - console.log("Instruction: Disconnect downstream (Became Tail)"); if (currentDownstreamPC) { currentDownstreamPC.close(); currentDownstreamPC = null; downstreamId = null; } - // Also clean up garbage bin oldDownstreamPCs.forEach(pc => pc.close()); oldDownstreamPCs = []; }); -// Signaling Router socket.on('signal_msg', async ({ sender, type, sdp, candidate }) => { - // A. OFFERS (Always come from Upstream) if (type === 'offer') { await handleUpstreamOffer(sender, sdp); - } - - // B. ANSWERS (Always come from Downstream) - else if (type === 'answer') { + } else if (type === 'answer') { if (currentDownstreamPC && sender === downstreamId) { await currentDownstreamPC.setRemoteDescription(new RTCSessionDescription(sdp)); } - } - - // C. CANDIDATES (Could be for anyone) - else if (type === 'candidate') { + } else if (type === 'candidate') { const ice = new RTCIceCandidate(candidate); - - // Try Active Downstream - if (currentDownstreamPC && sender === downstreamId) { - currentDownstreamPC.addIceCandidate(ice).catch(e => { }); - } - // Try Active Upstream - if (currentUpstreamPC) { - currentUpstreamPC.addIceCandidate(ice).catch(e => { }); - } - // Try Fading Connections (Crucial for smooth swap!) + if (currentDownstreamPC && sender === downstreamId) currentDownstreamPC.addIceCandidate(ice).catch(e => { }); + if (currentUpstreamPC) currentUpstreamPC.addIceCandidate(ice).catch(e => { }); oldUpstreamPCs.forEach(pc => pc.addIceCandidate(ice).catch(e => { })); oldDownstreamPCs.forEach(pc => pc.addIceCandidate(ice).catch(e => { })); } }); -socket.on('error_msg', (msg) => alert(msg)); -socket.on('request_keyframe', () => console.log("Keyframe requested by network")); -socket.on('stream_ended', () => { - alert("Stream ended"); +socket.on('error_msg', (msg) => { + alert(msg); location.reload(); }); -// --- 5. WebRTC Logic (Make-Before-Break) --- +socket.on('stream_ended', () => { + alert("Stream ended by host"); + location.reload(); +}); + +socket.on('request_keyframe', () => { + console.log("Network requested keyframe"); + // If we were using Insertable streams, we'd need to handle this. + // With Standard API, the browser handles PLI automatically. +}); + +// --- 5. WebRTC Logic (Merged Smart Disconnect) --- -// --- UPSTREAM Handling (Receiving) --- async function handleUpstreamOffer(senderId, sdp) { - console.log(`Negotiating new Upstream connection from ${senderId}`); - const newPC = new RTCPeerConnection({ iceServers }); + // Safety: If connection hangs, kill old one eventually + let safetyTimer = setTimeout(() => { + if (currentUpstreamPC && currentUpstreamPC !== newPC) { + currentUpstreamPC.close(); + } + }, 15000); + newPC.ontrack = (event) => { - console.log(`New Upstream Track (${event.track.kind}) Active.`); + clearTimeout(safetyTimer); // Success! - // A. Update Screen (Idempotent: doing this twice is fine) remoteVideo.srcObject = event.streams[0]; - statusDiv.innerText = "Status: Connected (Stable)"; + statusDiv.innerText = `Status: Connected | ID: ${socket.id}`; - // B. Hot-Swap the Relay if (currentDownstreamPC) { const sender = currentDownstreamPC.getSenders().find(s => s.track && s.track.kind === event.track.kind); - if (sender) { - sender.replaceTrack(event.track); - } + if (sender) sender.replaceTrack(event.track); } - // C. Retire the OLD connection (THE FIX IS HERE) - // We only queue the old PC if it exists AND it isn't the one we just created. + // Smart Disconnect: Old connection dies immediately upon success if (currentUpstreamPC && currentUpstreamPC !== newPC) { const oldPC = currentUpstreamPC; - oldUpstreamPCs.push(oldPC); - - console.log("Queueing old upstream connection for death in 4s..."); setTimeout(() => { oldPC.close(); - // Remove from garbage bin oldUpstreamPCs = oldUpstreamPCs.filter(pc => pc !== oldPC); - console.log("Closed old upstream connection."); - }, 4000); + }, 1000); } - - // D. Set New as Current currentUpstreamPC = newPC; }; newPC.onicecandidate = (event) => { - if (event.candidate) { - socket.emit('signal_msg', { target: senderId, type: 'candidate', candidate: event.candidate }); - } + if (event.candidate) socket.emit('signal_msg', { target: senderId, type: 'candidate', candidate: event.candidate }); }; await newPC.setRemoteDescription(new RTCSessionDescription(sdp)); const answer = await newPC.createAnswer(); await newPC.setLocalDescription(answer); - socket.emit('signal_msg', { target: senderId, type: 'answer', sdp: answer }); } -// --- DOWNSTREAM Handling (Sending) --- async function setupDownstreamConnection(targetId) { - console.log(`Setting up downstream to ${targetId}`); - - // 1. Retire existing downstream (Fade Out) if (currentDownstreamPC) { - console.log("Moving current downstream to background (Overlap Mode)"); const oldPC = currentDownstreamPC; oldDownstreamPCs.push(oldPC); - - // Keep sending for 5 seconds so they have time to connect to their NEW source setTimeout(() => { oldPC.close(); oldDownstreamPCs = oldDownstreamPCs.filter(pc => pc !== oldPC); - console.log("Closed old downstream connection"); }, 5000); } - // 2. Create NEW downstream currentDownstreamPC = new RTCPeerConnection({ iceServers }); - // 3. Add Tracks if (localStream) { - // Head: Send Camera localStream.getTracks().forEach(track => currentDownstreamPC.addTrack(track, localStream)); } else if (currentUpstreamPC) { - // Relay: Send what we are receiving currentUpstreamPC.getReceivers().forEach(receiver => { - if (receiver.track) { - // "remoteVideo.srcObject" ensures stream ID consistency - currentDownstreamPC.addTrack(receiver.track, remoteVideo.srcObject); - } + if (receiver.track) currentDownstreamPC.addTrack(receiver.track, remoteVideo.srcObject); }); } currentDownstreamPC.onicecandidate = (event) => { - if (event.candidate) { - socket.emit('signal_msg', { target: targetId, type: 'candidate', candidate: event.candidate }); - } + if (event.candidate) socket.emit('signal_msg', { target: targetId, type: 'candidate', candidate: event.candidate }); }; const offer = await currentDownstreamPC.createOffer(); - - // 4. BITRATE HACK: Force 4Mbps limit (Standard WebRTC defaults low) offer.sdp = offer.sdp.replace(/b=AS:([0-9]+)/g, 'b=AS:4000'); - if (!offer.sdp.includes('b=AS:')) { - // If no bandwidth line exists, add it to the video section - offer.sdp = offer.sdp.replace(/(m=video.*\r\n)/, '$1b=AS:4000\r\n'); - } + if (!offer.sdp.includes('b=AS:')) offer.sdp = offer.sdp.replace(/(m=video.*\r\n)/, '$1b=AS:4000\r\n'); await currentDownstreamPC.setLocalDescription(offer); socket.emit('signal_msg', { target: targetId, type: 'offer', sdp: offer }); diff --git a/public/index.html b/public/index.html index 961bc94..be90293 100644 --- a/public/index.html +++ b/public/index.html @@ -3,50 +3,262 @@ - - P2P Daisy Chain Livestream + + Strandcast -

    Daisy Chain Relay

    -
    - - + +
    +
    +

    Join a Broadcast

    +
    +

    Active Streams

    +
      +
    • Searching...
    • +
    +
    +
    +

    Go Live

    + +
    + +
    + + +
    +
    +
    +

    Stream Name

    + + +
    Status: Connecting...
    + +
    - - -
    Status: Disconnected
    diff --git a/public/monitor.html b/public/monitor.html new file mode 100644 index 0000000..55bc8ae --- /dev/null +++ b/public/monitor.html @@ -0,0 +1,214 @@ + + + + + + + Strandcast Monitor + + + + + +

    Network Status

    +
    Connecting...
    + + + + + \ No newline at end of file diff --git a/public/monitor.js b/public/monitor.js new file mode 100644 index 0000000..433a093 --- /dev/null +++ b/public/monitor.js @@ -0,0 +1,141 @@ +const socket = io(); +const dashboard = document.getElementById('dashboard'); + +// Identify as a monitor +socket.emit('join_monitor'); + +socket.on('monitor_update', ({ streams, scores }) => { + updateDashboard(streams, scores); +}); + +function updateDashboard(streams, scores) { + const activeStreamNames = Object.keys(streams); + + // 1. Remove streams that no longer exist + const existingContainers = document.querySelectorAll('.stream-container'); + existingContainers.forEach(container => { + const name = container.getAttribute('data-stream'); + if (!streams[name]) { + container.remove(); + } + }); + + // 2. Add or Update streams + if (activeStreamNames.length === 0) { + // If empty, show message (only if not already showing it) + if (!dashboard.innerText.includes("No Active Streams")) { + dashboard.innerHTML = "
    No Active Streams
    "; + } + return; + } else { + // FIX: Check if we are still showing a text message (Connecting, No Active Streams, etc) + // If we have streams but no stream containers yet, clear the text. + if (!dashboard.querySelector('.stream-container')) { + dashboard.innerHTML = ""; + } + } + + activeStreamNames.forEach(name => { + let container = document.getElementById(`stream-${name}`); + const chain = streams[name]; + + // Create Container if it doesn't exist + if (!container) { + container = document.createElement('div'); + container.id = `stream-${name}`; + container.className = 'stream-container'; + container.setAttribute('data-stream', name); + + // Structure + const title = document.createElement('div'); + title.className = 'stream-title'; + + const wrapper = document.createElement('div'); + wrapper.className = 'chain-wrapper'; + + const visual = document.createElement('div'); + visual.className = 'chain-visual'; + + wrapper.appendChild(visual); + container.appendChild(title); + container.appendChild(wrapper); + dashboard.appendChild(container); + } + + // Update Title + const titleEl = container.querySelector('.stream-title'); + titleEl.innerText = `Stream: ${name} (${chain.length} nodes)`; + + // Update Nodes + updateChainVisual(container.querySelector('.chain-visual'), chain, scores); + }); +} + +function updateChainVisual(visualContainer, chain, scores) { + const existingNodes = Array.from(visualContainer.querySelectorAll('.node-wrapper')); + const nodeMap = {}; + existingNodes.forEach(el => nodeMap[el.getAttribute('data-id')] = el); + + const processedIds = new Set(); + + chain.forEach((socketId, index) => { + processedIds.add(socketId); + + const score = scores[socketId] !== undefined ? scores[socketId] : '??'; + let healthClass = 'meh'; + if (score >= 80) healthClass = 'healthy'; + if (score < 50) healthClass = 'weak'; + if (index === 0) healthClass = 'healthy'; + + const role = index === 0 ? "SOURCE" : (index === chain.length - 1 ? "VIEWER" : "RELAY"); + const shortId = socketId.substring(0, 4); + + let nodeWrapper = nodeMap[socketId]; + + // --- CREATE --- + if (!nodeWrapper) { + nodeWrapper = document.createElement('div'); + nodeWrapper.className = 'node-wrapper'; + nodeWrapper.setAttribute('data-id', socketId); + nodeWrapper.style.display = 'flex'; + nodeWrapper.style.alignItems = 'center'; + + nodeWrapper.innerHTML = ` +
    +
    +
    +
    +
    +
    + `; + + visualContainer.appendChild(nodeWrapper); + } + + // --- UPDATE --- + // 1. Order (Preserve Scroll) + if (visualContainer.children[index] !== nodeWrapper) { + visualContainer.insertBefore(nodeWrapper, visualContainer.children[index]); + } + + // 2. Arrow Visibility + const arrow = nodeWrapper.querySelector('.arrow'); + arrow.style.opacity = index === 0 ? "0" : "1"; + + // 3. Data + const nodeEl = nodeWrapper.querySelector('.node'); + nodeEl.className = `node ${healthClass}`; + + nodeWrapper.querySelector('.node-role').innerText = role; + nodeWrapper.querySelector('.node-id').innerText = shortId; + nodeWrapper.querySelector('.node-score').innerText = score; + }); + + // --- REMOVE --- + existingNodes.forEach(el => { + const id = el.getAttribute('data-id'); + if (!processedIds.has(id)) { + el.remove(); + } + }); +} \ No newline at end of file diff --git a/server.js b/server.js index 805144d..30e0fa9 100644 --- a/server.js +++ b/server.js @@ -21,53 +21,66 @@ app.get('/api/get-turn-credentials', async (req, res) => { res.json(response.data); } catch (error) { console.error("Cloudflare Error:", error.message); - res.status(500).json({ error: "Failed to fetch credentials" }); + res.json({ iceServers: [{ urls: 'stun:stun.l.google.com:19302' }] }); } }); -// --- Daisy Chain Logic --- -let strand = []; // Ordered list of socket IDs -let scores = {}; // Map: socket.id -> score (0-100) +// --- STATE MANAGEMENT --- +// Format: { "StreamName": [socketId1, socketId2, ...] } +const streams = {}; +const socketStreamMap = {}; // Helper: socketId -> StreamName (for fast lookups on disconnect) +const scores = {}; // socketId -> Health Score (0-100) io.on('connection', (socket) => { console.log(`User connected: ${socket.id}`); - - // Default score for new users (optimistic, so they get a chance to prove themselves) scores[socket.id] = 80; - socket.on('update_score', (score) => { - scores[socket.id] = score; - }); + // 0. Send the list of streams to the new user + socket.emit('stream_list_update', Object.keys(streams)); - // A. Start Stream (Head) - socket.on('start_stream', () => { - if (strand.length > 0) { - socket.emit('error_msg', "Stream already in progress."); + // A. Start New Stream + socket.on('start_stream', (streamName) => { + if (streams[streamName]) { + socket.emit('error_msg', "Stream name already exists."); return; } - strand.push(socket.id); - console.log("Stream started. Head:", socket.id); + if (socketStreamMap[socket.id]) { + socket.emit('error_msg', "You are already in a stream."); + return; + } + + streams[streamName] = [socket.id]; + socketStreamMap[socket.id] = streamName; + + console.log(`Stream started: ${streamName} by ${socket.id}`); socket.emit('role_assigned', 'head'); + + // Broadcast new list to everyone in the lobby + io.emit('stream_list_update', Object.keys(streams)); }); - // B. Join Stream (Tail) - socket.on('join_stream', () => { - if (strand.length === 0) { - socket.emit('error_msg', "No active stream to join."); + // B. Join Existing Stream + socket.on('join_stream', (streamName) => { + const chain = streams[streamName]; + if (!chain) { + socket.emit('error_msg', "Stream does not exist."); return; } - const upstreamPeerId = strand[strand.length - 1]; - strand.push(socket.id); + const upstreamPeerId = chain[chain.length - 1]; + chain.push(socket.id); + socketStreamMap[socket.id] = streamName; - console.log(`User ${socket.id} joined. Upstream: ${upstreamPeerId}`); + console.log(`User ${socket.id} joined stream "${streamName}". Upstream: ${upstreamPeerId}`); - // 1. Tell Upstream to connect to ME + // Tell Upstream to connect to ME io.to(upstreamPeerId).emit('connect_to_downstream', { downstreamId: socket.id }); - // 2. Request Keyframe from Head (Delayed to allow connection setup) + // Request Keyframe (Targeting the Head of THIS stream) setTimeout(() => { - if (strand[0]) io.to(strand[0]).emit('request_keyframe'); + if (streams[streamName] && streams[streamName][0]) { + io.to(streams[streamName][0]).emit('request_keyframe'); + } }, 2000); }); @@ -76,112 +89,139 @@ io.on('connection', (socket) => { io.to(target).emit('signal_msg', { sender: socket.id, type, sdp, candidate }); }); - // D. Keyframe Relay - socket.on('relay_keyframe_upstream', () => { - // In a real robust app, you'd bubble this up the chain. - // Here we just blast the Source directly if we know who they are. - if (strand[0]) io.to(strand[0]).emit('request_keyframe'); + // D. Score Updates + socket.on('update_score', (score) => { + scores[socket.id] = score; }); - // E. Disconnects (Healing) + // E. Keyframe Relay + socket.on('relay_keyframe_upstream', () => { + const streamName = socketStreamMap[socket.id]; + if (streamName && streams[streamName]) { + io.to(streams[streamName][0]).emit('request_keyframe'); + } + }); + + // F. Disconnects (Healing) socket.on('disconnect', () => { - const index = strand.indexOf(socket.id); + delete scores[socket.id]; + const streamName = socketStreamMap[socket.id]; + + if (!streamName) return; // User wasn't in a stream + + const chain = streams[streamName]; + if (!chain) return; + + const index = chain.indexOf(socket.id); if (index === -1) return; - console.log(`User ${socket.id} disconnected. Index: ${index}`); + console.log(`User ${socket.id} left stream "${streamName}". Index: ${index}`); - // Head left + // Case 1: Head Left -> Destroy Stream if (index === 0) { - strand = []; - io.emit('stream_ended'); + delete streams[streamName]; + // Notify everyone in this stream that it ended + chain.forEach(peerId => { + if (peerId !== socket.id) io.to(peerId).emit('stream_ended'); + delete socketStreamMap[peerId]; + }); + io.emit('stream_list_update', Object.keys(streams)); // Update Lobby return; } - // Tail left - if (index === strand.length - 1) { - strand.pop(); - return; + // Case 2: Tail Left -> Just pop + if (index === chain.length - 1) { + chain.pop(); + const newTail = chain[chain.length - 1]; + if (newTail) io.to(newTail).emit('disconnect_downstream'); + } + // Case 3: Middle Left -> Stitch + else { + const upstreamNode = chain[index - 1]; + const downstreamNode = chain[index + 1]; + chain.splice(index, 1); // Remove node + + console.log(`Healing "${streamName}": ${upstreamNode} -> ${downstreamNode}`); + io.to(upstreamNode).emit('connect_to_downstream', { downstreamId: downstreamNode }); + + setTimeout(() => { + if (streams[streamName] && streams[streamName][0]) { + io.to(streams[streamName][0]).emit('request_keyframe'); + } + }, 2000); } - // Middle left (Stitch) - const upstreamNode = strand[index - 1]; - const downstreamNode = strand[index + 1]; - strand.splice(index, 1); + delete socketStreamMap[socket.id]; + }); - console.log(`Healing: ${upstreamNode} -> ${downstreamNode}`); - io.to(upstreamNode).emit('connect_to_downstream', { downstreamId: downstreamNode }); - - setTimeout(() => { - if (strand[0]) io.to(strand[0]).emit('request_keyframe'); - }, 2000); + socket.on('join_monitor', () => { + socket.join('monitors'); + // Send immediate state + socket.emit('monitor_update', { streams, scores }); }); }); -// --- THE OPTIMIZER LOOP --- +// 2. Broadcast State to Monitors (1Hz) +// We send the full topology and health scores every second. setInterval(() => { - if (strand.length < 3) return; // Need at least Head + 2 Nodes to swap anything + // Only emit if there is someone watching to save bandwidth + const monitorRoom = io.sockets.adapter.rooms.get('monitors'); + if (monitorRoom && monitorRoom.size > 0) { + io.to('monitors').emit('monitor_update', { streams, scores }); + } +}, 1000); - // We iterate from the 2nd node (Index 1) to the end. - // Index 0 is HEAD (Source), we never move the source. - let swapped = false; +// --- THE MULTI-STREAM OPTIMIZER --- +setInterval(() => { + // Loop through EVERY active stream + Object.keys(streams).forEach(streamName => { + const chain = streams[streamName]; - // We look for ONE swap per cycle to minimize chaos. - for (let i = 1; i < strand.length - 1; i++) { - const current = strand[i]; - const next = strand[i + 1]; + if (chain.length < 3) return; - const scoreCurrent = scores[current] || 0; - const scoreNext = scores[next] || 0; + // Scan this specific chain + for (let i = 1; i < chain.length - 1; i++) { + const current = chain[i]; + const next = chain[i + 1]; - // THRESHOLD: Only swap if the next guy is significantly stronger (+15 points) - // This prevents users flip-flopping constantly due to minor fluctuations. - if (scoreNext > scoreCurrent + 15) { - console.log(`Swapping Weak ${current} (${scoreCurrent}) with Strong ${next} (${scoreNext})`); + const scoreCurrent = scores[current] || 0; + const scoreNext = scores[next] || 0; - performSwap(i, i + 1); - swapped = true; - break; // Stop after one swap to let network settle + if (scoreNext > scoreCurrent + 15) { + console.log(`[${streamName}] Swapping ${current} (${scoreCurrent}) with ${next} (${scoreNext})`); + performSwap(chain, i, i + 1); + break; // One swap per stream per cycle + } } - } -}, 10000); // Run every 10 seconds + }); +}, 10000); -function performSwap(indexA, indexB) { - // 1. Update the Array (Logical Swap) - const nodeA = strand[indexA]; - const nodeB = strand[indexB]; - strand[indexA] = nodeB; - strand[indexB] = nodeA; +function performSwap(chain, indexA, indexB) { + const nodeA = chain[indexA]; + const nodeB = chain[indexB]; + chain[indexA] = nodeB; + chain[indexB] = nodeA; - // State after swap: - // [ ... -> Upstream -> NodeB -> NodeA -> Downstream -> ... ] + const upstreamNode = chain[indexA - 1]; + const downstreamNode = chain[indexB + 1]; - // Identify the relevant neighbors - const upstreamNode = strand[indexA - 1]; // The node before the pair - const downstreamNode = strand[indexB + 1]; // The node after the pair - - // 2. Instruct the Upstream to connect to the NEW first node (NodeB) - if (upstreamNode) { - io.to(upstreamNode).emit('connect_to_downstream', { downstreamId: nodeB }); - } - - // 3. Instruct NodeB to connect to NodeA (The internal link) + if (upstreamNode) io.to(upstreamNode).emit('connect_to_downstream', { downstreamId: nodeB }); io.to(nodeB).emit('connect_to_downstream', { downstreamId: nodeA }); - // 4. Instruct NodeA to connect to the Downstream (or null if end) if (downstreamNode) { io.to(nodeA).emit('connect_to_downstream', { downstreamId: downstreamNode }); } else { - // If NodeA is now the tail, it disconnects its downstream - io.to(nodeA).emit('disconnect_downstream'); // You might need to add this handler to client + io.to(nodeA).emit('disconnect_downstream'); } - // 5. Trigger Keyframes to heal the image + // Request Keyframe from Head setTimeout(() => { - if (strand[0]) io.to(strand[0]).emit('request_keyframe'); + if (chain[0]) io.to(chain[0]).emit('request_keyframe'); }, 1000); + setTimeout(() => { + if (chain[0]) io.to(chain[0]).emit('request_keyframe'); + }, 4000); } - - const PORT = process.env.PORT || 3000; server.listen(PORT, () => console.log(`Server running on http://localhost:${PORT}`)); \ No newline at end of file