diff --git a/package.json b/package.json index 078f8dd..5cf8a68 100644 --- a/package.json +++ b/package.json @@ -15,4 +15,4 @@ "express": "^5.2.1", "socket.io": "^4.8.1" } -} +} \ No newline at end of file diff --git a/public/client.js b/public/client.js index 434e016..4965d04 100644 --- a/public/client.js +++ b/public/client.js @@ -6,6 +6,8 @@ const streamListUl = document.getElementById('streamList'); const lobbyView = document.getElementById('lobby-view'); const streamView = document.getElementById('stream-view'); const streamTitle = document.getElementById('currentStreamTitle'); +const upstreamMetricsBox = document.getElementById('upstreamMetrics'); +const downstreamMetricsBox = document.getElementById('downstreamMetrics'); // --- Global State --- let localStream = null; @@ -15,20 +17,39 @@ let iceServers = []; let currentUpstreamPC = null; let currentDownstreamPC = null; let downstreamId = null; +let origVideoStream, + displayedVideoStream, + sentVideoStream, + origAudioStream, + displayedAudioStream, + sentAudioStream, + origVideoTrack, + origAudioTrack, + displayedVideoTrack, + displayedAudioTrack, + sentVideoTrack, + sentAudioTrack; // FADING connections let oldUpstreamPCs = []; let oldDownstreamPCs = []; +// Debug metrics history for bitrate/fps deltas +const metricsHistory = { + upstream: null, + downstream: null, +}; + // --- 1. Initialization --- async function init() { - try { - const response = await fetch('/api/get-turn-credentials'); - const data = await response.json(); - iceServers = data.iceServers; - } catch (e) { - iceServers = [{ urls: 'stun:stun.l.google.com:19302' }]; - } + try { + const response = await fetch('/api/get-turn-credentials'); + const data = await response.json(); + console.log('TURN data:', data); + iceServers = data.iceServers; + } catch (e) { + iceServers = [{ urls: 'stun:stun.l.google.com:19302' }]; + } } init(); @@ -36,255 +57,541 @@ init(); // Receive list of streams from server socket.on('stream_list_update', (streams) => { - streamListUl.innerHTML = ""; - if (streams.length === 0) { - streamListUl.innerHTML = "
  • No active streams. Start one!
  • "; - return; - } + streamListUl.innerHTML = ''; + if (streams.length === 0) { + streamListUl.innerHTML = "
  • No active streams. Start one!
  • "; + return; + } - streams.forEach(name => { - const li = document.createElement('li'); - li.innerText = `${name}`; - li.onclick = () => joinStream(name); - streamListUl.appendChild(li); - }); + streams.forEach((name) => { + const li = document.createElement('li'); + li.innerText = `${name}`; + li.onclick = () => joinStream(name); + streamListUl.appendChild(li); + }); }); async function startStream() { - const name = document.getElementById('streamNameInput').value; - const fileInput = document.getElementById('videoFileInput'); - const file = fileInput.files[0]; + const name = document.getElementById('streamNameInput').value; + const fileInput = document.getElementById('videoFileInput'); + const file = fileInput.files[0]; - if (!name) return alert("Please enter a name"); + if (!name) return alert('Please enter a name'); - try { - if (file) { - // --- OPTION A: VIDEO FILE MODE --- - console.log("Starting stream from video file..."); + try { + if (file) { + // --- OPTION A: VIDEO FILE MODE --- + console.log('Starting stream from video file...'); - // 1. Create a URL for the local file - const fileURL = URL.createObjectURL(file); + // 1. Create a URL for the local file + const fileURL = URL.createObjectURL(file); - // 2. Set it to the local video element - localVideo.src = fileURL; - localVideo.loop = true; // <--- HERE IS THE LOOP LOGIC - localVideo.muted = false; // Must be unmuted to capture audio, use headphones! - localVideo.volume = 1.0; + // 2. Set it to the local video element + localVideo.src = fileURL; + localVideo.loop = true; // <--- HERE IS THE LOOP LOGIC + localVideo.muted = false; // Must be unmuted to capture audio, use headphones! + localVideo.volume = 1.0; - // 3. Play the video (required before capturing) - await localVideo.play(); + // 3. Play the video (required before capturing) + await localVideo.play(); - // 4. Capture the stream from the video element - // (Chrome/Edge use captureStream, Firefox uses mozCaptureStream) - if (localVideo.captureStream) { - localStream = localVideo.captureStream(); - } else if (localVideo.mozCaptureStream) { - localStream = localVideo.mozCaptureStream(); - } else { - return alert("Your browser does not support capturing video from files."); - } + // 4. Capture the stream from the video element + // (Chrome/Edge use captureStream, Firefox uses mozCaptureStream) + if (localVideo.captureStream) { + localStream = localVideo.captureStream(); + } else { + return alert('Your browser does not support capturing video from files.'); + } - // Note: captureStream() sometimes doesn't capture audio if the element is muted. - // If you want to stream audio, you must hear it locally too. + // Note: captureStream() sometimes doesn't capture audio if the element is muted. + // If you want to stream audio, you must hear it locally too. + } else { + // --- OPTION B: WEBCAM MODE --- + console.log('Starting stream from Webcam...'); + localStream = await navigator.mediaDevices.getUserMedia({ + video: { width: { ideal: 4096 } }, + audio: true, + }); + localVideo.srcObject = localStream.clone(); + localVideo.muted = true; // Mute local webcam to avoid feedback loop + } - } else { - // --- OPTION B: WEBCAM MODE --- - console.log("Starting stream from Webcam..."); - localStream = await navigator.mediaDevices.getUserMedia({ video: true, audio: true }); - localVideo.srcObject = localStream; - localVideo.muted = true; // Mute local webcam to avoid feedback loop - } + // --- COMMON LOGIC --- - // --- COMMON LOGIC --- + // UI Switch + lobbyView.style.display = 'none'; + streamView.style.display = 'block'; + remoteVideo.style.display = 'none'; // Broadcaster doesn't see remote + streamTitle.innerText = `Broadcasting: ${name}`; - // UI Switch - lobbyView.style.display = 'none'; - streamView.style.display = 'block'; - remoteVideo.style.display = 'none'; // Broadcaster doesn't see remote - streamTitle.innerText = `Broadcasting: ${name}`; - - socket.emit('start_stream', name); - statusDiv.innerText = `Status: Broadcasting (Head) | ID: ${socket.id}`; - - } catch (err) { - console.error(err); - alert("Failed to start stream: " + err.message); - } + socket.emit('start_stream', name); + statusDiv.innerText = `Status: Broadcasting (Head) | ID: ${socket.id}`; + } catch (err) { + console.error(err); + alert('Failed to start stream: ' + err.message); + } } function joinStream(name) { - // UI Switch - lobbyView.style.display = 'none'; - streamView.style.display = 'block'; - localVideo.style.display = 'none'; // Viewers don't see themselves - streamTitle.innerText = `Watching: ${name}`; + // UI Switch + lobbyView.style.display = 'none'; + streamView.style.display = 'block'; + localVideo.style.display = 'none'; // Viewers don't see themselves + streamTitle.innerText = `Watching: ${name}`; - socket.emit('join_stream', name); - statusDiv.innerText = `Status: Joining chain... | ID: ${socket.id}`; + socket.emit('join_stream', name); + statusDiv.innerText = `Status: Joining chain... | ID: ${socket.id}`; } function leaveStream() { - location.reload(); // Simple way to reset state and go back to lobby + location.reload(); // Simple way to reset state and go back to lobby } - // --- 3. Health Reporting (Unchanged) --- setInterval(calculateAndReportHealth, 5000); +setInterval(pollPeerMetrics, 2000); async function calculateAndReportHealth() { - if (localStream) { - socket.emit('update_score', 100); - return; - } - if (!currentUpstreamPC) return; + if (localStream) { + socket.emit('update_score', 100); + return; + } + if (!currentUpstreamPC) return; - try { - const stats = await currentUpstreamPC.getStats(); - let packetsLost = 0; - let packetsReceived = 0; - let jitter = 0; + try { + const stats = await currentUpstreamPC.getStats(); + let packetsLost = 0; + let packetsReceived = 0; + let jitter = 0; - stats.forEach(report => { - if (report.type === 'inbound-rtp' && report.kind === 'video') { - packetsLost = report.packetsLost || 0; - packetsReceived = report.packetsReceived || 0; - jitter = report.jitter || 0; - } - }); + stats.forEach((report) => { + if (report.type === 'inbound-rtp' && report.kind === 'video') { + packetsLost = report.packetsLost || 0; + packetsReceived = report.packetsReceived || 0; + jitter = report.jitter || 0; + } + }); - const totalPackets = packetsReceived + packetsLost; - if (totalPackets === 0) return; + const totalPackets = packetsReceived + packetsLost; + if (totalPackets === 0) return; - const lossRate = packetsLost / totalPackets; - const score = 100 - (lossRate * 100 * 5) - (jitter * 100); - const finalScore = Math.max(0, Math.min(100, score)); - console.log(`Health: ${finalScore.toFixed(0)} | Loss: ${(lossRate * 100).toFixed(2)}%`); + const lossRate = packetsLost / totalPackets; + const score = 100 - lossRate * 100 * 5 - jitter * 100; + const finalScore = Math.max(0, Math.min(100, score)); + console.log(`Health: ${finalScore.toFixed(0)} | Loss: ${(lossRate * 100).toFixed(2)}%`); - socket.emit('update_score', finalScore); - } catch (e) { } + socket.emit('update_score', finalScore); + } catch (e) {} } // --- 4. Socket Events (Logic Updated for Smart Disconnect) --- socket.on('connect_to_downstream', async ({ downstreamId: targetId }) => { - downstreamId = targetId; - await setupDownstreamConnection(targetId); + downstreamId = targetId; + await setupDownstreamConnection(targetId); }); socket.on('disconnect_downstream', () => { - if (currentDownstreamPC) { - currentDownstreamPC.close(); - currentDownstreamPC = null; - downstreamId = null; - } - oldDownstreamPCs.forEach(pc => pc.close()); - oldDownstreamPCs = []; + if (currentDownstreamPC) { + currentDownstreamPC.close(); + currentDownstreamPC = null; + downstreamId = null; + } + oldDownstreamPCs.forEach((pc) => pc.close()); + oldDownstreamPCs = []; }); socket.on('signal_msg', async ({ sender, type, sdp, candidate }) => { - if (type === 'offer') { - await handleUpstreamOffer(sender, sdp); - } else if (type === 'answer') { - if (currentDownstreamPC && sender === downstreamId) { - await currentDownstreamPC.setRemoteDescription(new RTCSessionDescription(sdp)); - } - } else if (type === 'candidate') { - const ice = new RTCIceCandidate(candidate); - if (currentDownstreamPC && sender === downstreamId) currentDownstreamPC.addIceCandidate(ice).catch(e => { }); - if (currentUpstreamPC) currentUpstreamPC.addIceCandidate(ice).catch(e => { }); - oldUpstreamPCs.forEach(pc => pc.addIceCandidate(ice).catch(e => { })); - oldDownstreamPCs.forEach(pc => pc.addIceCandidate(ice).catch(e => { })); - } + if (type === 'offer') { + await handleUpstreamOffer(sender, sdp); + } else if (type === 'answer') { + if (currentDownstreamPC && sender === downstreamId) { + await currentDownstreamPC.setRemoteDescription(new RTCSessionDescription(sdp)); + } + } else if (type === 'candidate') { + const ice = new RTCIceCandidate(candidate); + if (currentDownstreamPC && sender === downstreamId) currentDownstreamPC.addIceCandidate(ice).catch((e) => {}); + if (currentUpstreamPC) currentUpstreamPC.addIceCandidate(ice).catch((e) => {}); + oldUpstreamPCs.forEach((pc) => pc.addIceCandidate(ice).catch((e) => {})); + oldDownstreamPCs.forEach((pc) => pc.addIceCandidate(ice).catch((e) => {})); + } }); socket.on('error_msg', (msg) => { - alert(msg); - location.reload(); + alert(msg); + location.reload(); }); socket.on('stream_ended', () => { - alert("Stream ended by host"); - location.reload(); + alert('Stream ended by host'); + location.reload(); }); socket.on('request_keyframe', () => { - console.log("Network requested keyframe"); - // If we were using Insertable streams, we'd need to handle this. - // With Standard API, the browser handles PLI automatically. + console.log('Network requested keyframe'); + // If we were using Insertable streams, we'd need to handle this. + // With Standard API, the browser handles PLI automatically. }); // --- 5. WebRTC Logic (Merged Smart Disconnect) --- async function handleUpstreamOffer(senderId, sdp) { - const newPC = new RTCPeerConnection({ iceServers }); + const newPC = new RTCPeerConnection({ iceServers }); - // Safety: If connection hangs, kill old one eventually - let safetyTimer = setTimeout(() => { - if (currentUpstreamPC && currentUpstreamPC !== newPC) { - currentUpstreamPC.close(); - } - }, 15000); + // Safety: If connection hangs, kill old one eventually + let safetyTimer = setTimeout(() => { + if (currentUpstreamPC && currentUpstreamPC !== newPC) { + currentUpstreamPC.close(); + } + }, 15000); - newPC.ontrack = (event) => { - clearTimeout(safetyTimer); // Success! + newPC.ontrack = async (event) => { + console.log('Received track from upstream:', event); + clearTimeout(safetyTimer); // Success! - remoteVideo.srcObject = event.streams[0]; - statusDiv.innerText = `Status: Connected | ID: ${socket.id}`; + if (event.track.kind === 'video') { + origVideoStream = event.streams[0]; + displayedVideoStream = origVideoStream.clone(); + sentVideoStream = origVideoStream.clone(); - if (currentDownstreamPC) { - const sender = currentDownstreamPC.getSenders().find(s => s.track && s.track.kind === event.track.kind); - if (sender) sender.replaceTrack(event.track); - } + origVideoTrack = event.track; + displayedVideoTrack = origVideoTrack.clone(); + sentVideoTrack = origVideoTrack.clone(); + } else if (event.track.kind === 'audio') { + origAudioStream = event.streams[0]; + displayedAudioStream = origAudioStream.clone(); + sentAudioStream = origAudioStream.clone(); - // Smart Disconnect: Old connection dies immediately upon success - if (currentUpstreamPC && currentUpstreamPC !== newPC) { - const oldPC = currentUpstreamPC; - setTimeout(() => { - oldPC.close(); - oldUpstreamPCs = oldUpstreamPCs.filter(pc => pc !== oldPC); - }, 1000); - } - currentUpstreamPC = newPC; - }; + origAudioTrack = event.track; + displayedAudioTrack = origAudioTrack.clone(); + sentAudioTrack = origAudioTrack.clone(); + } - newPC.onicecandidate = (event) => { - if (event.candidate) socket.emit('signal_msg', { target: senderId, type: 'candidate', candidate: event.candidate }); - }; + // Rebuild displayedStream + const displayedStream = new MediaStream(); + if (displayedVideoTrack) displayedStream.addTrack(displayedVideoTrack); + if (displayedAudioTrack) displayedStream.addTrack(displayedAudioTrack); - await newPC.setRemoteDescription(new RTCSessionDescription(sdp)); - const answer = await newPC.createAnswer(); - await newPC.setLocalDescription(answer); - socket.emit('signal_msg', { target: senderId, type: 'answer', sdp: answer }); + remoteVideo.srcObject = displayedStream; + statusDiv.innerText = `Status: Connected | ID: ${socket.id}`; + + if (currentDownstreamPC) { + console.log('Relaying new upstream stream to downstream'); + const videoSender = currentDownstreamPC.getSenders().find((s) => s.track && s.track.kind === 'video'); + const audioSender = currentDownstreamPC.getSenders().find((s) => s.track && s.track.kind === 'video'); + if (videoSender) await videoSender.replaceTrack(sentVideoTrack); + if (audioSender) await audioSender.replaceTrack(sentAudioTrack); + } + + // Smart Disconnect: Old connection dies immediately upon success + if (currentUpstreamPC && currentUpstreamPC !== newPC) { + const oldPC = currentUpstreamPC; + setTimeout(() => { + oldPC.close(); + oldUpstreamPCs = oldUpstreamPCs.filter((pc) => pc !== oldPC); + }, 1000); + } + currentUpstreamPC = newPC; + }; + + newPC.onicecandidate = (event) => { + if (event.candidate) socket.emit('signal_msg', { target: senderId, type: 'candidate', candidate: event.candidate }); + }; + + await newPC.setRemoteDescription(new RTCSessionDescription(sdp)); + const answer = await newPC.createAnswer(); + await newPC.setLocalDescription(answer); + socket.emit('signal_msg', { target: senderId, type: 'answer', sdp: answer }); } async function setupDownstreamConnection(targetId) { - if (currentDownstreamPC) { - const oldPC = currentDownstreamPC; - oldDownstreamPCs.push(oldPC); - setTimeout(() => { - oldPC.close(); - oldDownstreamPCs = oldDownstreamPCs.filter(pc => pc !== oldPC); - }, 5000); - } + if (currentDownstreamPC) { + const oldPC = currentDownstreamPC; + oldDownstreamPCs.push(oldPC); + setTimeout(() => { + oldPC.close(); + oldDownstreamPCs = oldDownstreamPCs.filter((pc) => pc !== oldPC); + }, 5000); + } - currentDownstreamPC = new RTCPeerConnection({ iceServers }); + currentDownstreamPC = new RTCPeerConnection({ iceServers }); - if (localStream) { - localStream.getTracks().forEach(track => currentDownstreamPC.addTrack(track, localStream)); - } else if (currentUpstreamPC) { - currentUpstreamPC.getReceivers().forEach(receiver => { - if (receiver.track) currentDownstreamPC.addTrack(receiver.track, remoteVideo.srcObject); - }); - } + if (localStream) { + console.log('Sending local stream tracks to downstream'); + localStream.getTracks().forEach((track) => currentDownstreamPC.addTrack(track, localStream)); + } else if (currentUpstreamPC) { + console.log('Relaying upstream stream tracks to downstream'); + // currentDownstreamPC.addTrack(sentVideoTrack, sentVideoStream); - currentDownstreamPC.onicecandidate = (event) => { - if (event.candidate) socket.emit('signal_msg', { target: targetId, type: 'candidate', candidate: event.candidate }); - }; + currentUpstreamPC.getReceivers().map((receiver) => { + console.log('Receiver track:', receiver.track); + if (!receiver.track) return; - const offer = await currentDownstreamPC.createOffer(); - offer.sdp = offer.sdp.replace(/b=AS:([0-9]+)/g, 'b=AS:4000'); - if (!offer.sdp.includes('b=AS:')) offer.sdp = offer.sdp.replace(/(m=video.*\r\n)/, '$1b=AS:4000\r\n'); + const sentTrack = receiver.track.kind === 'video' ? sentVideoTrack : sentAudioTrack; + const sentStream = receiver.track.kind === 'video' ? sentVideoStream : sentAudioStream; + currentDownstreamPC.addTrack(sentTrack, sentStream); + }); + } - await currentDownstreamPC.setLocalDescription(offer); - socket.emit('signal_msg', { target: targetId, type: 'offer', sdp: offer }); -} \ No newline at end of file + currentDownstreamPC.onicecandidate = (event) => { + if (event.candidate) socket.emit('signal_msg', { target: targetId, type: 'candidate', candidate: event.candidate }); + }; + + await Promise.all( + currentDownstreamPC.getSenders().map(async (sender) => { + const params = sender.getParameters(); + params.encodings = params.encodings.map((enc) => { + enc.maxBitrate = 200_000_000; + enc.maxFramerate = 60; + enc.scaleResolutionDownBy = 1.0; + enc.priority = 'high'; + return enc; + }); + params.degradationPreference = 'maintain-resolution'; + await sender.setParameters(params); + }) + ); + + const offer = await currentDownstreamPC.createOffer(); + // offer.sdp = offer.sdp.replace(/b=AS:([0-9]+)/g, 'b=AS:4000'); + // if (!offer.sdp.includes('b=AS:')) offer.sdp = offer.sdp.replace(/(m=video.*\r\n)/, '$1b=AS:4000\r\n'); + + await currentDownstreamPC.setLocalDescription(offer); + socket.emit('signal_msg', { target: targetId, type: 'offer', sdp: offer }); +} + +// --- 6. Debug Metrics (bitrate / loss / fps) --- + +async function pollPeerMetrics() { + try { + const upstreamResult = currentUpstreamPC + ? await collectInboundMetrics(currentUpstreamPC, metricsHistory.upstream) + : null; + const downstreamResult = currentDownstreamPC + ? await collectOutboundMetrics(currentDownstreamPC, metricsHistory.downstream) + : null; + + metricsHistory.upstream = upstreamResult + ? upstreamResult.snapshot + : currentUpstreamPC + ? metricsHistory.upstream + : null; + metricsHistory.downstream = downstreamResult + ? downstreamResult.snapshot + : currentDownstreamPC + ? metricsHistory.downstream + : null; + + renderMetrics(upstreamResult ? upstreamResult.display : null, downstreamResult ? downstreamResult.display : null); + + // Stream metrics to the monitor dashboard + socket.emit('report_metrics', { + inbound: upstreamResult ? upstreamResult.display : null, + outbound: downstreamResult ? downstreamResult.display : null, + }); + } catch (err) { + console.warn('Metrics poll failed', err); + } +} + +async function collectInboundMetrics(pc, previous) { + const stats = await pc.getStats(); + let inboundVideo = null; + let candidatePair = null; + + const tag = pc.__metricsTag || (pc.__metricsTag = Math.random().toString(36).slice(2)); + const prev = previous && previous.tag === tag ? previous : null; + + stats.forEach((report) => { + if (report.type === 'inbound-rtp' && report.kind === 'video' && !report.isRemote) inboundVideo = report; + if (report.type === 'candidate-pair' && report.state === 'succeeded' && report.nominated) candidatePair = report; + }); + + if (!inboundVideo) return null; + + const deltaMs = prev ? inboundVideo.timestamp - prev.timestamp : null; + const bytesDelta = prev ? inboundVideo.bytesReceived - prev.bytesReceived : null; + const bitrateKbps = deltaMs && deltaMs > 0 && bytesDelta >= 0 ? (bytesDelta * 8) / deltaMs : null; // timestamp is ms + const framesDelta = + prev && inboundVideo.framesDecoded !== undefined && prev.framesDecoded !== undefined + ? inboundVideo.framesDecoded - prev.framesDecoded + : null; + const fps = framesDelta !== null && deltaMs && deltaMs > 0 ? (framesDelta * 1000) / deltaMs : null; + + const packetLossPct = + inboundVideo.packetsLost !== undefined && inboundVideo.packetsReceived !== undefined + ? (inboundVideo.packetsLost / (inboundVideo.packetsReceived + inboundVideo.packetsLost)) * 100 + : null; + const jitterMs = inboundVideo.jitter !== undefined ? inboundVideo.jitter * 1000 : null; + const rttMs = + candidatePair && candidatePair.currentRoundTripTime !== undefined + ? candidatePair.currentRoundTripTime * 1000 + : null; + + const codecReport = inboundVideo.codecId && stats.get ? stats.get(inboundVideo.codecId) : null; + const codecLabel = codecReport ? codecReport.mimeType || codecReport.codecId || codecReport.sdpFmtpLine || '' : null; + + return { + display: { + bitrateKbps, + fps, + resolution: + inboundVideo.frameWidth && inboundVideo.frameHeight + ? `${inboundVideo.frameWidth}x${inboundVideo.frameHeight}` + : null, + packetLossPct, + jitterMs, + rttMs, + pli: inboundVideo.pliCount, + nack: inboundVideo.nackCount, + fir: inboundVideo.firCount, + framesDropped: inboundVideo.framesDropped, + codec: codecLabel, + state: pc.iceConnectionState || pc.connectionState, + }, + snapshot: { + timestamp: inboundVideo.timestamp, + bytesReceived: inboundVideo.bytesReceived || 0, + framesDecoded: inboundVideo.framesDecoded || 0, + tag, + }, + }; +} + +async function collectOutboundMetrics(pc, previous) { + const stats = await pc.getStats(); + let outboundVideo = null; + let remoteInbound = null; + let candidatePair = null; + + const tag = pc.__metricsTag || (pc.__metricsTag = Math.random().toString(36).slice(2)); + const prev = previous && previous.tag === tag ? previous : null; + + stats.forEach((report) => { + if (report.type === 'outbound-rtp' && report.kind === 'video' && !report.isRemote) outboundVideo = report; + if (report.type === 'remote-inbound-rtp' && report.kind === 'video') remoteInbound = report; + if (report.type === 'candidate-pair' && report.state === 'succeeded' && report.nominated) candidatePair = report; + }); + + if (!outboundVideo) return null; + + const deltaMs = prev ? outboundVideo.timestamp - prev.timestamp : null; + const bytesDelta = prev ? outboundVideo.bytesSent - prev.bytesSent : null; + const bitrateKbps = deltaMs && deltaMs > 0 && bytesDelta >= 0 ? (bytesDelta * 8) / deltaMs : null; + const framesDelta = + prev && outboundVideo.framesEncoded !== undefined && prev.framesEncoded !== undefined + ? outboundVideo.framesEncoded - prev.framesEncoded + : null; + const fps = framesDelta !== null && deltaMs && deltaMs > 0 ? (framesDelta * 1000) / deltaMs : null; + + let packetLossPct = null; + if (remoteInbound && remoteInbound.packetsLost !== undefined && remoteInbound.packetsReceived !== undefined) { + packetLossPct = (remoteInbound.packetsLost / (remoteInbound.packetsReceived + remoteInbound.packetsLost)) * 100; + } + + const rttMs = + candidatePair && candidatePair.currentRoundTripTime !== undefined + ? candidatePair.currentRoundTripTime * 1000 + : null; + const codecReport = outboundVideo.codecId && stats.get ? stats.get(outboundVideo.codecId) : null; + const codecLabel = codecReport ? codecReport.mimeType || codecReport.codecId || codecReport.sdpFmtpLine || '' : null; + + return { + display: { + bitrateKbps, + fps, + resolution: + outboundVideo.frameWidth && outboundVideo.frameHeight + ? `${outboundVideo.frameWidth}x${outboundVideo.frameHeight}` + : null, + packetLossPct, + rttMs, + qualityLimit: outboundVideo.qualityLimitationReason || 'none', + nack: outboundVideo.nackCount, + pli: outboundVideo.pliCount, + fir: outboundVideo.firCount, + retransmits: outboundVideo.retransmittedPacketsSent, + codec: codecLabel, + state: pc.iceConnectionState || pc.connectionState, + }, + snapshot: { + timestamp: outboundVideo.timestamp, + bytesSent: outboundVideo.bytesSent || 0, + framesEncoded: outboundVideo.framesEncoded || 0, + tag, + }, + }; +} + +function renderMetrics(inboundDisplay, outboundDisplay) { + if (!currentUpstreamPC) { + upstreamMetricsBox.innerHTML = 'No upstream peer (head broadcaster).'; + } else if (!inboundDisplay) { + upstreamMetricsBox.innerHTML = 'Collecting inbound stats...'; + } else { + upstreamMetricsBox.innerHTML = metricsLines([ + ['State', inboundDisplay.state || '--'], + ['Bitrate', formatBitrate(inboundDisplay.bitrateKbps)], + ['FPS', formatNumber(inboundDisplay.fps)], + ['Resolution', inboundDisplay.resolution || '--'], + ['Loss', formatPercent(inboundDisplay.packetLossPct)], + ['Jitter', formatMillis(inboundDisplay.jitterMs)], + ['RTT', formatMillis(inboundDisplay.rttMs)], + ['PLI/NACK/FIR', formatTriple(inboundDisplay.pli, inboundDisplay.nack, inboundDisplay.fir)], + ['Frames Dropped', formatCount(inboundDisplay.framesDropped)], + ['Codec', inboundDisplay.codec || '--'], + ]); + } + + if (!currentDownstreamPC) { + downstreamMetricsBox.innerHTML = 'No downstream peer connected.'; + } else if (!outboundDisplay) { + downstreamMetricsBox.innerHTML = 'Collecting outbound stats...'; + } else { + downstreamMetricsBox.innerHTML = metricsLines([ + ['State', outboundDisplay.state || '--'], + ['Bitrate', formatBitrate(outboundDisplay.bitrateKbps)], + ['FPS', formatNumber(outboundDisplay.fps)], + ['Resolution', outboundDisplay.resolution || '--'], + ['Loss (remote)', formatPercent(outboundDisplay.packetLossPct)], + ['RTT', formatMillis(outboundDisplay.rttMs)], + ['Quality Limit', outboundDisplay.qualityLimit || '--'], + ['PLI/NACK/FIR', formatTriple(outboundDisplay.pli, outboundDisplay.nack, outboundDisplay.fir)], + ['Retransmits', formatCount(outboundDisplay.retransmits)], + ['Codec', outboundDisplay.codec || '--'], + ]); + } +} + +function metricsLines(rows) { + return rows + .map(([label, value]) => `
    ${label}${value}
    `) + .join(''); +} + +function formatNumber(value, decimals = 1) { + return Number.isFinite(value) ? value.toFixed(decimals) : '--'; +} + +function formatBitrate(kbps) { + return Number.isFinite(kbps) ? `${kbps.toFixed(0)} kbps` : '--'; +} + +function formatPercent(value) { + return Number.isFinite(value) ? `${value.toFixed(1)}%` : '--'; +} + +function formatMillis(value) { + return Number.isFinite(value) ? `${value.toFixed(1)} ms` : '--'; +} + +function formatCount(value) { + return Number.isFinite(value) ? `${value}` : '--'; +} + +function formatTriple(a, b, c) { + const pa = Number.isFinite(a) ? a : '-'; + const pb = Number.isFinite(b) ? b : '-'; + const pc = Number.isFinite(c) ? c : '-'; + return `${pa}/${pb}/${pc}`; +} diff --git a/public/index.html b/public/index.html index be90293..1dcbacd 100644 --- a/public/index.html +++ b/public/index.html @@ -1,266 +1,318 @@ + + + + Strandcast + - + .navbar .brand { + margin-bottom: 10px; + } - - -
    -
    -

    Join a Broadcast

    -
    -

    Active Streams

    -
      -
    • Searching...
    • -
    -
    -
    -

    Go Live

    - -
    - -
    - - -
    -
    -
    -

    Stream Name

    - - -
    Status: Connecting...
    - -
    -
    - - - + .nav-links a { + margin: 0 10px; + } - \ No newline at end of file + .container { + padding: 10px; + } + + .card { + padding: 15px; + } + + h1 { + font-size: 20px; + } + } + + + + + +
    +
    +

    Join a Broadcast

    +
    +

    + Active Streams +

    +
      +
    • + Searching... +
    • +
    +
    +
    +

    + Go Live +

    + +
    + +
    + + +
    +
    +
    +

    Stream Name

    + + +
    Status: Connecting...
    +
    +
    +
    Inbound (from upstream)
    +
    Waiting for data...
    +
    +
    +
    Outbound (to downstream)
    +
    Waiting for data...
    +
    +
    + +
    +
    + + + + diff --git a/public/monitor.html b/public/monitor.html index 55bc8ae..0f96937 100644 --- a/public/monitor.html +++ b/public/monitor.html @@ -1,214 +1,285 @@ + + + + Strandcast Monitor + - + .meh { + border-color: #ffaa00; + color: #ffaa00; + } - - -

    Network Status

    -
    Connecting...
    - - - + .weak { + border-color: #ff3333; + color: #ff3333; + } - \ No newline at end of file + /* Arrows */ + .arrow { + font-size: 16px; + color: #555; + align-self: center; + } + + /* --- MOBILE MEDIA QUERY --- */ + @media (max-width: 600px) { + .navbar { + flex-direction: column; + padding: 10px; + } + + .navbar .brand { + margin-bottom: 10px; + } + + .nav-links a { + margin: 0 10px; + } + + #dashboard { + padding: 10px; + } + + h1 { + font-size: 20px; + } + } + + + + + +

    Network Status

    +
    Connecting...
    + + + + diff --git a/public/monitor.js b/public/monitor.js index 433a093..5c9d7ce 100644 --- a/public/monitor.js +++ b/public/monitor.js @@ -4,138 +4,179 @@ const dashboard = document.getElementById('dashboard'); // Identify as a monitor socket.emit('join_monitor'); -socket.on('monitor_update', ({ streams, scores }) => { - updateDashboard(streams, scores); +socket.on('monitor_update', ({ streams, scores, metrics }) => { + updateDashboard(streams, scores, metrics || {}); }); -function updateDashboard(streams, scores) { - const activeStreamNames = Object.keys(streams); +function updateDashboard(streams, scores, metrics) { + const activeStreamNames = Object.keys(streams); - // 1. Remove streams that no longer exist - const existingContainers = document.querySelectorAll('.stream-container'); - existingContainers.forEach(container => { - const name = container.getAttribute('data-stream'); - if (!streams[name]) { - container.remove(); - } - }); + // 1. Remove streams that no longer exist + const existingContainers = document.querySelectorAll('.stream-container'); + existingContainers.forEach((container) => { + const name = container.getAttribute('data-stream'); + if (!streams[name]) { + container.remove(); + } + }); - // 2. Add or Update streams - if (activeStreamNames.length === 0) { - // If empty, show message (only if not already showing it) - if (!dashboard.innerText.includes("No Active Streams")) { - dashboard.innerHTML = "
    No Active Streams
    "; - } - return; - } else { - // FIX: Check if we are still showing a text message (Connecting, No Active Streams, etc) - // If we have streams but no stream containers yet, clear the text. - if (!dashboard.querySelector('.stream-container')) { - dashboard.innerHTML = ""; - } - } + // 2. Add or Update streams + if (activeStreamNames.length === 0) { + // If empty, show message (only if not already showing it) + if (!dashboard.innerText.includes('No Active Streams')) { + dashboard.innerHTML = "
    No Active Streams
    "; + } + return; + } else { + // FIX: Check if we are still showing a text message (Connecting, No Active Streams, etc) + // If we have streams but no stream containers yet, clear the text. + if (!dashboard.querySelector('.stream-container')) { + dashboard.innerHTML = ''; + } + } - activeStreamNames.forEach(name => { - let container = document.getElementById(`stream-${name}`); - const chain = streams[name]; + activeStreamNames.forEach((name) => { + let container = document.getElementById(`stream-${name}`); + const chain = streams[name]; - // Create Container if it doesn't exist - if (!container) { - container = document.createElement('div'); - container.id = `stream-${name}`; - container.className = 'stream-container'; - container.setAttribute('data-stream', name); + // Create Container if it doesn't exist + if (!container) { + container = document.createElement('div'); + container.id = `stream-${name}`; + container.className = 'stream-container'; + container.setAttribute('data-stream', name); - // Structure - const title = document.createElement('div'); - title.className = 'stream-title'; + // Structure + const title = document.createElement('div'); + title.className = 'stream-title'; - const wrapper = document.createElement('div'); - wrapper.className = 'chain-wrapper'; + const wrapper = document.createElement('div'); + wrapper.className = 'chain-wrapper'; - const visual = document.createElement('div'); - visual.className = 'chain-visual'; + const visual = document.createElement('div'); + visual.className = 'chain-visual'; - wrapper.appendChild(visual); - container.appendChild(title); - container.appendChild(wrapper); - dashboard.appendChild(container); - } + wrapper.appendChild(visual); + container.appendChild(title); + container.appendChild(wrapper); + dashboard.appendChild(container); + } - // Update Title - const titleEl = container.querySelector('.stream-title'); - titleEl.innerText = `Stream: ${name} (${chain.length} nodes)`; + // Update Title + const titleEl = container.querySelector('.stream-title'); + titleEl.innerText = `Stream: ${name} (${chain.length} nodes)`; - // Update Nodes - updateChainVisual(container.querySelector('.chain-visual'), chain, scores); - }); + // Update Nodes + updateChainVisual(container.querySelector('.chain-visual'), chain, scores, metrics); + }); } -function updateChainVisual(visualContainer, chain, scores) { - const existingNodes = Array.from(visualContainer.querySelectorAll('.node-wrapper')); - const nodeMap = {}; - existingNodes.forEach(el => nodeMap[el.getAttribute('data-id')] = el); +function updateChainVisual(visualContainer, chain, scores, metrics) { + const existingNodes = Array.from(visualContainer.querySelectorAll('.node-wrapper')); + const nodeMap = {}; + existingNodes.forEach((el) => (nodeMap[el.getAttribute('data-id')] = el)); - const processedIds = new Set(); + const processedIds = new Set(); - chain.forEach((socketId, index) => { - processedIds.add(socketId); + chain.forEach((socketId, index) => { + processedIds.add(socketId); - const score = scores[socketId] !== undefined ? scores[socketId] : '??'; - let healthClass = 'meh'; - if (score >= 80) healthClass = 'healthy'; - if (score < 50) healthClass = 'weak'; - if (index === 0) healthClass = 'healthy'; + const score = scores[socketId] !== undefined ? scores[socketId] : '??'; + let healthClass = 'meh'; + if (score >= 80) healthClass = 'healthy'; + if (score < 50) healthClass = 'weak'; + if (index === 0) healthClass = 'healthy'; - const role = index === 0 ? "SOURCE" : (index === chain.length - 1 ? "VIEWER" : "RELAY"); - const shortId = socketId.substring(0, 4); + const role = index === 0 ? 'SOURCE' : index === chain.length - 1 ? 'VIEWER' : 'RELAY'; + const shortId = socketId.substring(0, 4); - let nodeWrapper = nodeMap[socketId]; + const metric = metrics[socketId] || {}; + let nodeWrapper = nodeMap[socketId]; - // --- CREATE --- - if (!nodeWrapper) { - nodeWrapper = document.createElement('div'); - nodeWrapper.className = 'node-wrapper'; - nodeWrapper.setAttribute('data-id', socketId); - nodeWrapper.style.display = 'flex'; - nodeWrapper.style.alignItems = 'center'; + // --- CREATE --- + if (!nodeWrapper) { + nodeWrapper = document.createElement('div'); + nodeWrapper.className = 'node-wrapper'; + nodeWrapper.setAttribute('data-id', socketId); - nodeWrapper.innerHTML = ` -
    -
    -
    -
    -
    -
    - `; + nodeWrapper.innerHTML = ` +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    +
    + `; - visualContainer.appendChild(nodeWrapper); - } + visualContainer.appendChild(nodeWrapper); + } - // --- UPDATE --- - // 1. Order (Preserve Scroll) - if (visualContainer.children[index] !== nodeWrapper) { - visualContainer.insertBefore(nodeWrapper, visualContainer.children[index]); - } + // --- UPDATE --- + // 1. Order (Preserve Scroll) + if (visualContainer.children[index] !== nodeWrapper) { + visualContainer.insertBefore(nodeWrapper, visualContainer.children[index]); + } - // 2. Arrow Visibility - const arrow = nodeWrapper.querySelector('.arrow'); - arrow.style.opacity = index === 0 ? "0" : "1"; + // 2. Arrow Visibility + const arrow = nodeWrapper.querySelector('.arrow'); + arrow.style.opacity = index === 0 ? '0' : '1'; - // 3. Data - const nodeEl = nodeWrapper.querySelector('.node'); - nodeEl.className = `node ${healthClass}`; + // 3. Data + const nodeEl = nodeWrapper.querySelector('.node'); + nodeEl.className = `node ${healthClass}`; - nodeWrapper.querySelector('.node-role').innerText = role; - nodeWrapper.querySelector('.node-id').innerText = shortId; - nodeWrapper.querySelector('.node-score').innerText = score; - }); + nodeWrapper.querySelector('.node-role').innerText = role; + nodeWrapper.querySelector('.node-id').innerText = shortId; + nodeWrapper.querySelector('.node-score').innerText = score; - // --- REMOVE --- - existingNodes.forEach(el => { - const id = el.getAttribute('data-id'); - if (!processedIds.has(id)) { - el.remove(); - } - }); -} \ No newline at end of file + const metricsBox = nodeWrapper.querySelector('.node-metrics'); + metricsBox.innerHTML = renderMetricsLines(metric.inbound, metric.outbound); + }); + + // --- REMOVE --- + existingNodes.forEach((el) => { + const id = el.getAttribute('data-id'); + if (!processedIds.has(id)) { + el.remove(); + } + }); +} + +function renderMetricsLines(inbound, outbound) { + const rows = []; + rows.push(metricRow('Inbound', inbound ? formatMetricLines(inbound, true) : ['—'])); + rows.push(metricRow('Outbound', outbound ? formatMetricLines(outbound, false) : ['—'])); + return `
    ${rows.join('')}
    `; +} + +function metricRow(label, valueLines) { + const valueHtml = valueLines.map((v) => `${v}`).join(''); + return `
    ${label}${valueHtml}
    `; +} + +function formatMetricLines(m, isInbound) { + const lines = []; + lines.push(fmtBitrate(m.bitrateKbps)); + lines.push(`loss ${fmtPercent(m.packetLossPct)}`); + lines.push(`fps ${fmtNumber(m.fps)}`); + if (m.resolution) lines.push(m.resolution); + if (!isInbound && m.qualityLimit) lines.push(`limit ${m.qualityLimit}`); + return lines; +} + +function fmtNumber(v) { + return Number.isFinite(v) ? v.toFixed(1) : '--'; +} + +function fmtBitrate(kbps) { + return Number.isFinite(kbps) ? `${kbps.toFixed(0)} kbps` : '--'; +} + +function fmtPercent(v) { + return Number.isFinite(v) ? `${v.toFixed(1)}%` : '--'; +} diff --git a/server.js b/server.js index 30e0fa9..0e74cd9 100644 --- a/server.js +++ b/server.js @@ -1,7 +1,7 @@ require('dotenv').config(); const express = require('express'); const http = require('http'); -const { Server } = require("socket.io"); +const { Server } = require('socket.io'); const axios = require('axios'); const app = express(); @@ -12,17 +12,18 @@ app.use(express.static('public')); // --- Cloudflare Credentials --- app.get('/api/get-turn-credentials', async (req, res) => { - try { - const response = await axios.post( - `https://rtc.live.cloudflare.com/v1/turn/keys/${process.env.CLOUDFLARE_APP_ID}/credentials/generate-ice-servers`, - { ttl: 86400 }, - { headers: { 'Authorization': `Bearer ${process.env.CLOUDFLARE_API_TOKEN}`, 'Content-Type': 'application/json' } } - ); - res.json(response.data); - } catch (error) { - console.error("Cloudflare Error:", error.message); - res.json({ iceServers: [{ urls: 'stun:stun.l.google.com:19302' }] }); - } + try { + // const response = await axios.post( + // `https://rtc.live.cloudflare.com/v1/turn/keys/${process.env.CLOUDFLARE_APP_ID}/credentials/generate-ice-servers`, + // { ttl: 86400 }, + // { headers: { 'Authorization': `Bearer ${process.env.CLOUDFLARE_API_TOKEN}`, 'Content-Type': 'application/json' } } + // ); + const response = await fetch(`https://strandcast.benglsoftware.com/api/get-turn-credentials`); + res.json(await response.json()); + } catch (error) { + console.error('Cloudflare Error:', error.message); + res.json({ iceServers: [{ urls: 'stun:stun.l.google.com:19302' }] }); + } }); // --- STATE MANAGEMENT --- @@ -30,198 +31,205 @@ app.get('/api/get-turn-credentials', async (req, res) => { const streams = {}; const socketStreamMap = {}; // Helper: socketId -> StreamName (for fast lookups on disconnect) const scores = {}; // socketId -> Health Score (0-100) +const metrics = {}; // socketId -> { inbound, outbound, ts } io.on('connection', (socket) => { - console.log(`User connected: ${socket.id}`); - scores[socket.id] = 80; + console.log(`User connected: ${socket.id}`); + scores[socket.id] = 80; - // 0. Send the list of streams to the new user - socket.emit('stream_list_update', Object.keys(streams)); + // 0. Send the list of streams to the new user + socket.emit('stream_list_update', Object.keys(streams)); - // A. Start New Stream - socket.on('start_stream', (streamName) => { - if (streams[streamName]) { - socket.emit('error_msg', "Stream name already exists."); - return; - } - if (socketStreamMap[socket.id]) { - socket.emit('error_msg', "You are already in a stream."); - return; - } + // A. Start New Stream + socket.on('start_stream', (streamName) => { + if (streams[streamName]) { + socket.emit('error_msg', 'Stream name already exists.'); + return; + } + if (socketStreamMap[socket.id]) { + socket.emit('error_msg', 'You are already in a stream.'); + return; + } - streams[streamName] = [socket.id]; - socketStreamMap[socket.id] = streamName; + streams[streamName] = [socket.id]; + socketStreamMap[socket.id] = streamName; - console.log(`Stream started: ${streamName} by ${socket.id}`); - socket.emit('role_assigned', 'head'); + console.log(`Stream started: ${streamName} by ${socket.id}`); + socket.emit('role_assigned', 'head'); - // Broadcast new list to everyone in the lobby - io.emit('stream_list_update', Object.keys(streams)); - }); + // Broadcast new list to everyone in the lobby + io.emit('stream_list_update', Object.keys(streams)); + }); - // B. Join Existing Stream - socket.on('join_stream', (streamName) => { - const chain = streams[streamName]; - if (!chain) { - socket.emit('error_msg', "Stream does not exist."); - return; - } + // B. Join Existing Stream + socket.on('join_stream', (streamName) => { + const chain = streams[streamName]; + if (!chain) { + socket.emit('error_msg', 'Stream does not exist.'); + return; + } - const upstreamPeerId = chain[chain.length - 1]; - chain.push(socket.id); - socketStreamMap[socket.id] = streamName; + const upstreamPeerId = chain[chain.length - 1]; + chain.push(socket.id); + socketStreamMap[socket.id] = streamName; - console.log(`User ${socket.id} joined stream "${streamName}". Upstream: ${upstreamPeerId}`); + console.log(`User ${socket.id} joined stream "${streamName}". Upstream: ${upstreamPeerId}`); - // Tell Upstream to connect to ME - io.to(upstreamPeerId).emit('connect_to_downstream', { downstreamId: socket.id }); + // Tell Upstream to connect to ME + io.to(upstreamPeerId).emit('connect_to_downstream', { downstreamId: socket.id }); - // Request Keyframe (Targeting the Head of THIS stream) - setTimeout(() => { - if (streams[streamName] && streams[streamName][0]) { - io.to(streams[streamName][0]).emit('request_keyframe'); - } - }, 2000); - }); + // Request Keyframe (Targeting the Head of THIS stream) + setTimeout(() => { + if (streams[streamName] && streams[streamName][0]) { + io.to(streams[streamName][0]).emit('request_keyframe'); + } + }, 2000); + }); - // C. Signaling - socket.on('signal_msg', ({ target, type, sdp, candidate }) => { - io.to(target).emit('signal_msg', { sender: socket.id, type, sdp, candidate }); - }); + // C. Signaling + socket.on('signal_msg', ({ target, type, sdp, candidate }) => { + io.to(target).emit('signal_msg', { sender: socket.id, type, sdp, candidate }); + }); - // D. Score Updates - socket.on('update_score', (score) => { - scores[socket.id] = score; - }); + // D. Score Updates + socket.on('update_score', (score) => { + scores[socket.id] = score; + }); - // E. Keyframe Relay - socket.on('relay_keyframe_upstream', () => { - const streamName = socketStreamMap[socket.id]; - if (streamName && streams[streamName]) { - io.to(streams[streamName][0]).emit('request_keyframe'); - } - }); + // D2. Metrics Updates + socket.on('report_metrics', (payload) => { + metrics[socket.id] = { ...payload, ts: Date.now() }; + }); - // F. Disconnects (Healing) - socket.on('disconnect', () => { - delete scores[socket.id]; - const streamName = socketStreamMap[socket.id]; + // E. Keyframe Relay + socket.on('relay_keyframe_upstream', () => { + const streamName = socketStreamMap[socket.id]; + if (streamName && streams[streamName]) { + io.to(streams[streamName][0]).emit('request_keyframe'); + } + }); - if (!streamName) return; // User wasn't in a stream + // F. Disconnects (Healing) + socket.on('disconnect', () => { + delete scores[socket.id]; + delete metrics[socket.id]; + const streamName = socketStreamMap[socket.id]; - const chain = streams[streamName]; - if (!chain) return; + if (!streamName) return; // User wasn't in a stream - const index = chain.indexOf(socket.id); - if (index === -1) return; + const chain = streams[streamName]; + if (!chain) return; - console.log(`User ${socket.id} left stream "${streamName}". Index: ${index}`); + const index = chain.indexOf(socket.id); + if (index === -1) return; - // Case 1: Head Left -> Destroy Stream - if (index === 0) { - delete streams[streamName]; - // Notify everyone in this stream that it ended - chain.forEach(peerId => { - if (peerId !== socket.id) io.to(peerId).emit('stream_ended'); - delete socketStreamMap[peerId]; - }); - io.emit('stream_list_update', Object.keys(streams)); // Update Lobby - return; - } + console.log(`User ${socket.id} left stream "${streamName}". Index: ${index}`); - // Case 2: Tail Left -> Just pop - if (index === chain.length - 1) { - chain.pop(); - const newTail = chain[chain.length - 1]; - if (newTail) io.to(newTail).emit('disconnect_downstream'); - } - // Case 3: Middle Left -> Stitch - else { - const upstreamNode = chain[index - 1]; - const downstreamNode = chain[index + 1]; - chain.splice(index, 1); // Remove node + // Case 1: Head Left -> Destroy Stream + if (index === 0) { + delete streams[streamName]; + // Notify everyone in this stream that it ended + chain.forEach((peerId) => { + if (peerId !== socket.id) io.to(peerId).emit('stream_ended'); + delete socketStreamMap[peerId]; + }); + io.emit('stream_list_update', Object.keys(streams)); // Update Lobby + return; + } - console.log(`Healing "${streamName}": ${upstreamNode} -> ${downstreamNode}`); - io.to(upstreamNode).emit('connect_to_downstream', { downstreamId: downstreamNode }); + // Case 2: Tail Left -> Just pop + if (index === chain.length - 1) { + chain.pop(); + const newTail = chain[chain.length - 1]; + if (newTail) io.to(newTail).emit('disconnect_downstream'); + } + // Case 3: Middle Left -> Stitch + else { + const upstreamNode = chain[index - 1]; + const downstreamNode = chain[index + 1]; + chain.splice(index, 1); // Remove node - setTimeout(() => { - if (streams[streamName] && streams[streamName][0]) { - io.to(streams[streamName][0]).emit('request_keyframe'); - } - }, 2000); - } + console.log(`Healing "${streamName}": ${upstreamNode} -> ${downstreamNode}`); + io.to(upstreamNode).emit('connect_to_downstream', { downstreamId: downstreamNode }); - delete socketStreamMap[socket.id]; - }); + setTimeout(() => { + if (streams[streamName] && streams[streamName][0]) { + io.to(streams[streamName][0]).emit('request_keyframe'); + } + }, 2000); + } - socket.on('join_monitor', () => { - socket.join('monitors'); - // Send immediate state - socket.emit('monitor_update', { streams, scores }); - }); + delete socketStreamMap[socket.id]; + }); + + socket.on('join_monitor', () => { + socket.join('monitors'); + // Send immediate state + socket.emit('monitor_update', { streams, scores, metrics }); + }); }); // 2. Broadcast State to Monitors (1Hz) // We send the full topology and health scores every second. setInterval(() => { - // Only emit if there is someone watching to save bandwidth - const monitorRoom = io.sockets.adapter.rooms.get('monitors'); - if (monitorRoom && monitorRoom.size > 0) { - io.to('monitors').emit('monitor_update', { streams, scores }); - } + // Only emit if there is someone watching to save bandwidth + const monitorRoom = io.sockets.adapter.rooms.get('monitors'); + if (monitorRoom && monitorRoom.size > 0) { + io.to('monitors').emit('monitor_update', { streams, scores, metrics }); + } }, 1000); // --- THE MULTI-STREAM OPTIMIZER --- setInterval(() => { - // Loop through EVERY active stream - Object.keys(streams).forEach(streamName => { - const chain = streams[streamName]; + // Loop through EVERY active stream + Object.keys(streams).forEach((streamName) => { + const chain = streams[streamName]; - if (chain.length < 3) return; + if (chain.length < 3) return; - // Scan this specific chain - for (let i = 1; i < chain.length - 1; i++) { - const current = chain[i]; - const next = chain[i + 1]; + // Scan this specific chain + for (let i = 1; i < chain.length - 1; i++) { + const current = chain[i]; + const next = chain[i + 1]; - const scoreCurrent = scores[current] || 0; - const scoreNext = scores[next] || 0; + const scoreCurrent = scores[current] || 0; + const scoreNext = scores[next] || 0; - if (scoreNext > scoreCurrent + 15) { - console.log(`[${streamName}] Swapping ${current} (${scoreCurrent}) with ${next} (${scoreNext})`); - performSwap(chain, i, i + 1); - break; // One swap per stream per cycle - } - } - }); + if (scoreNext > scoreCurrent + 15) { + console.log(`[${streamName}] Swapping ${current} (${scoreCurrent}) with ${next} (${scoreNext})`); + performSwap(chain, i, i + 1); + break; // One swap per stream per cycle + } + } + }); }, 10000); function performSwap(chain, indexA, indexB) { - const nodeA = chain[indexA]; - const nodeB = chain[indexB]; - chain[indexA] = nodeB; - chain[indexB] = nodeA; + const nodeA = chain[indexA]; + const nodeB = chain[indexB]; + chain[indexA] = nodeB; + chain[indexB] = nodeA; - const upstreamNode = chain[indexA - 1]; - const downstreamNode = chain[indexB + 1]; + const upstreamNode = chain[indexA - 1]; + const downstreamNode = chain[indexB + 1]; - if (upstreamNode) io.to(upstreamNode).emit('connect_to_downstream', { downstreamId: nodeB }); - io.to(nodeB).emit('connect_to_downstream', { downstreamId: nodeA }); + if (upstreamNode) io.to(upstreamNode).emit('connect_to_downstream', { downstreamId: nodeB }); + io.to(nodeB).emit('connect_to_downstream', { downstreamId: nodeA }); - if (downstreamNode) { - io.to(nodeA).emit('connect_to_downstream', { downstreamId: downstreamNode }); - } else { - io.to(nodeA).emit('disconnect_downstream'); - } + if (downstreamNode) { + io.to(nodeA).emit('connect_to_downstream', { downstreamId: downstreamNode }); + } else { + io.to(nodeA).emit('disconnect_downstream'); + } - // Request Keyframe from Head - setTimeout(() => { - if (chain[0]) io.to(chain[0]).emit('request_keyframe'); - }, 1000); - setTimeout(() => { - if (chain[0]) io.to(chain[0]).emit('request_keyframe'); - }, 4000); + // Request Keyframe from Head + setTimeout(() => { + if (chain[0]) io.to(chain[0]).emit('request_keyframe'); + }, 1000); + setTimeout(() => { + if (chain[0]) io.to(chain[0]).emit('request_keyframe'); + }, 4000); } const PORT = process.env.PORT || 3000; -server.listen(PORT, () => console.log(`Server running on http://localhost:${PORT}`)); \ No newline at end of file +server.listen(PORT, () => console.log(`Server running on http://localhost:${PORT}`));