From 4523ed7cc09b22a446968888e216dbb9b687631c Mon Sep 17 00:00:00 2001 From: Joey Eamigh <55670930+JoeyEamigh@users.noreply.github.com> Date: Wed, 10 Dec 2025 22:08:06 -0500 Subject: [PATCH] use datachannels instead of video lmao why does this work --- public/client.js | 578 ++++++++++++++++++++++++++++++++++++++++++---- public/monitor.js | 31 ++- 2 files changed, 557 insertions(+), 52 deletions(-) diff --git a/public/client.js b/public/client.js index 4965d04..3a5bc08 100644 --- a/public/client.js +++ b/public/client.js @@ -12,6 +12,49 @@ const downstreamMetricsBox = document.getElementById('downstreamMetrics'); // --- Global State --- let localStream = null; let iceServers = []; +const useDataChannelVideo = true; // single-encode fanout +let isHeadNode = false; + +// Datachannel video pipeline +let videoProcessor; +let videoReader; +let videoEncoder; +let videoDecoder; +let videoGenerator; +let videoWriter; +let videoDecoderConfigured = false; +let decoderConfigCache = null; +let upstreamVideoChannel = null; +const downstreamVideoChannels = new Map(); +let lastVideoChunkTs = 0n; +const CHUNK_HEADER_SIZE = 14; // 1(type)+1(chunk type)+8(ts)+4(duration) +let supportedEncoding = []; +let supportedDecoding = []; +let supportedCodecs = []; +let hasReceivedKeyframe = false; + +// Datachannel metrics (bytes/frames based) +const dcStats = { + video: { + txBytes: 0, + rxBytes: 0, + txFrames: 0, + rxFrames: 0, + lastSampleTs: null, + lastTxBytes: 0, + lastRxBytes: 0, + lastTxFrames: 0, + lastRxFrames: 0, + bitrateKbpsTx: null, + bitrateKbpsRx: null, + fpsTx: null, + fpsRx: null, + }, +}; + +// Simple backpressure guard +let encoderBusy = false; +let decodeQueue = 0; // ACTIVE connections let currentUpstreamPC = null; @@ -50,6 +93,44 @@ async function init() { } catch (e) { iceServers = [{ urls: 'stun:stun.l.google.com:19302' }]; } + + // test codecs + const codecs = ['avc1.420034', 'hvc1.1.6.L123.00', 'vp8', 'vp09.00.10.08', 'av01.0.04M.08']; + const accelerations = ['prefer-hardware', 'prefer-software']; + + const configs = []; + for (const codec of codecs) { + for (const acceleration of accelerations) { + configs.push({ + codec, + hardwareAcceleration: acceleration, + width: 1920, + height: 1080, + bitrate: 4_000_000, + bitrateMode: 'variable', + framerate: 30, + }); + } + } + + for (const config of configs) { + const support = await VideoEncoder.isConfigSupported(config); + console.log(`VideoEncoder's config ${JSON.stringify(support.config)} support: ${support.supported}`); + if (support.supported) + supportedEncoding.push({ codec: config.codec, hardwareAcceleration: config.hardwareAcceleration }); + } + + for (const config of configs) { + const support = await VideoDecoder.isConfigSupported(config); + console.log(`VideoDecoder's config ${JSON.stringify(support.config)} support: ${support.supported}`); + if (support.supported) + supportedEncoding.push({ codec: config.codec, hardwareAcceleration: config.hardwareAcceleration }); + } + + supportedCodecs = supportedEncoding.filter((enc) => + supportedDecoding.some((dec) => dec.codec === enc.codec && dec.hardwareAcceleration === enc.hardwareAcceleration) + ); + console.log('Supported codecs for datachannel video:', supportedCodecs); } init(); @@ -124,6 +205,11 @@ async function startStream() { remoteVideo.style.display = 'none'; // Broadcaster doesn't see remote streamTitle.innerText = `Broadcasting: ${name}`; + isHeadNode = true; + if (useDataChannelVideo) { + await startHeadVideoEncoding(); + } + socket.emit('start_stream', name); statusDiv.innerText = `Status: Broadcasting (Head) | ID: ${socket.id}`; } catch (err) { @@ -147,11 +233,306 @@ function leaveStream() { location.reload(); // Simple way to reset state and go back to lobby } +// --- Datachannel Video Helpers --- +async function startHeadVideoEncoding() { + if (typeof VideoEncoder === 'undefined') { + alert('VideoEncoder API not supported in this browser.'); + return; + } + if (!localStream) return; + const videoTrack = localStream.getVideoTracks()[0]; + if (!videoTrack) return; + + cleanupDataChannelVideo(); + + videoProcessor = new MediaStreamTrackProcessor({ track: videoTrack }); + videoReader = videoProcessor.readable.getReader(); + + const settings = videoTrack.getSettings(); + const width = settings.width || 1920; + const height = settings.height || 1080; + const framerate = settings.frameRate || 30; + + const candidates = supportedCodecs.map((s) => ({ + codec: s.codec, + hardwareAcceleration: s.hardwareAcceleration, + width, + height, + bitrate: 4_000_000, + bitrateMode: 'variable', + framerate, + })); + + let chosenConfig = null; + for (const cfg of candidates) { + const support = await VideoEncoder.isConfigSupported(cfg); + console.log('Encoder config support check', JSON.stringify(cfg.codec, null, 2), JSON.stringify(support, null, 2)); + if (support.supported) { + chosenConfig = support.config; + break; + } + } + + if (!chosenConfig) { + // fall back to vp09.00.10.08 software + chosenConfig = { + codec: 'vp09.00.10.08', + hardwareAcceleration: 'prefer-software', + width, + height, + bitrate: 4_000_000, + bitrateMode: 'variable', + framerate, + }; + } + + videoEncoder = new VideoEncoder({ + output: handleEncodedChunk, + error: (e) => console.error('VideoEncoder error', e), + }); + + videoEncoder.configure(chosenConfig); + decoderConfigCache = { + codec: chosenConfig.codec, + codedWidth: chosenConfig.width, + codedHeight: chosenConfig.height, + hardwareAcceleration: 'prefer-hardware', + }; + + pumpVideoEncodeLoop(); +} + +async function pumpVideoEncodeLoop() { + if (!videoReader || !videoEncoder) return; + let frameCount = 0; + while (videoReader && videoEncoder && videoEncoder.state === 'configured') { + const { value: frame, done } = await videoReader.read(); + if (done || !frame) break; + const keyFrame = frameCount % 60 === 0; // force a keyframe roughly every 2s at 30fps + try { + videoEncoder.encode(frame, { keyFrame }); + } catch (err) { + console.warn('encode failed', err); + frame.close(); + break; + } + frame.close(); + frameCount += 1; + } +} + +function handleEncodedChunk(chunk, metadata) { + if (metadata && metadata.decoderConfig) { + decoderConfigCache = metadata.decoderConfig; + broadcastDecoderConfig(); + configureDecoderIfNeeded(decoderConfigCache); + } + + const payload = packEncodedChunk(chunk); + dcStats.video.txBytes += payload.byteLength; + dcStats.video.txFrames += 1; + broadcastVideoPayload(payload); +} + +function packEncodedChunk(chunk) { + const total = CHUNK_HEADER_SIZE + chunk.byteLength; + const buf = new ArrayBuffer(total); + const view = new DataView(buf); + view.setUint8(0, 1); // message type: chunk + view.setUint8(1, chunk.type === 'key' ? 1 : 0); + view.setBigUint64(2, BigInt(chunk.timestamp)); + view.setUint32(10, chunk.duration ? Number(chunk.duration) : 0); + chunk.copyTo(new Uint8Array(buf, CHUNK_HEADER_SIZE)); + lastVideoChunkTs = BigInt(chunk.timestamp); + return buf; +} + +function broadcastDecoderConfig() { + if (!decoderConfigCache) return; + const message = JSON.stringify({ type: 'config', config: decoderConfigCache }); + downstreamVideoChannels.forEach((dc) => { + if (dc.readyState === 'open') dc.send(message); + }); +} + +function broadcastVideoPayload(payload, exclude) { + downstreamVideoChannels.forEach((dc) => { + if (dc === exclude) return; + if (dc.readyState === 'open') dc.send(payload); + }); +} + +function setupUpstreamVideoChannel(channel) { + upstreamVideoChannel = channel; + channel.binaryType = 'arraybuffer'; + channel.onmessage = async (evt) => { + await handleVideoDataMessage(evt.data, channel); + }; + channel.onclose = () => { + upstreamVideoChannel = null; + }; + channel.onerror = (e) => console.warn('Upstream video channel error', e); +} + +function setupDownstreamVideoChannel(targetId, channel) { + channel.binaryType = 'arraybuffer'; + channel.onopen = () => { + downstreamVideoChannels.set(targetId, channel); + if (decoderConfigCache) channel.send(JSON.stringify({ type: 'config', config: decoderConfigCache })); + }; + channel.onclose = () => { + downstreamVideoChannels.delete(targetId); + }; + channel.onerror = (e) => console.warn('Downstream video channel error', e); +} + +async function handleVideoDataMessage(data, inboundChannel) { + if (typeof data === 'string') { + try { + const msg = JSON.parse(data); + if (msg.type === 'config') { + decoderConfigCache = msg.config; + configureDecoderIfNeeded(decoderConfigCache); + broadcastDecoderConfig(); + } + } catch (err) { + console.warn('Failed to parse video config', err); + } + return; + } + + const buffer = data instanceof ArrayBuffer ? data : await data.arrayBuffer(); + const chunk = unpackEncodedChunk(buffer); + if (!chunk) return; + if (!hasReceivedKeyframe && chunk.type === 'delta') { + // Ignore delta frames until we see a keyframe after configure/reset + return; + } + if (chunk.type === 'key') { + hasReceivedKeyframe = true; + } + dcStats.video.rxBytes += buffer.byteLength; + dcStats.video.rxFrames += 1; + // If we have downstreams, count this as outbound too (relay fanout) + if (downstreamVideoChannels.size > 0) { + dcStats.video.txBytes += buffer.byteLength; + dcStats.video.txFrames += 1; + } + if (videoDecoder) { + decodeQueue += 1; + videoDecoder.decode(chunk); + } + broadcastVideoPayload(buffer, inboundChannel); +} + +function unpackEncodedChunk(buffer) { + if (!buffer || buffer.byteLength <= CHUNK_HEADER_SIZE) return null; + const view = new DataView(buffer); + if (view.getUint8(0) !== 1) return null; + const chunkType = view.getUint8(1) === 1 ? 'key' : 'delta'; + const timestamp = Number(view.getBigUint64(2)); + const duration = view.getUint32(10); + const data = new Uint8Array(buffer, CHUNK_HEADER_SIZE); + return new EncodedVideoChunk({ type: chunkType, timestamp, duration, data }); +} + +async function configureDecoderIfNeeded(config) { + if (typeof VideoDecoder === 'undefined') { + console.warn('VideoDecoder API not supported'); + return; + } + if (videoDecoderConfigured || !config) return; + videoDecoder = new VideoDecoder({ + output: handleDecodedFrame, + error: (e) => console.error('VideoDecoder error', e), + }); + + const support = await VideoDecoder.isConfigSupported(config).catch(() => ({ supported: false })); + if (!support.supported) { + console.warn('Decoder config unsupported', config); + return; + } + videoDecoder.configure(config); + videoDecoderConfigured = true; + hasReceivedKeyframe = false; + ensureRenderedStream(); +} + +function ensureRenderedStream() { + if (videoGenerator && videoWriter) return; + videoGenerator = new MediaStreamTrackGenerator({ kind: 'video' }); + videoWriter = videoGenerator.writable.getWriter(); + let ms = remoteVideo.srcObject instanceof MediaStream ? remoteVideo.srcObject : new MediaStream(); + try { + ms.addTrack(videoGenerator); + } catch (e) { + ms = new MediaStream([videoGenerator]); + } + remoteVideo.srcObject = ms; + remoteVideo.style.display = 'block'; +} + +function handleDecodedFrame(frame) { + decodeQueue = Math.max(0, decodeQueue - 1); + if (!videoWriter) { + frame.close(); + return; + } + videoWriter.write(frame).catch((err) => { + console.warn('Failed to write decoded frame', err); + frame.close(); + }); +} + +function cleanupDataChannelVideo() { + if (videoReader) { + try { + videoReader.cancel(); + } catch (e) {} + } + videoReader = null; + videoProcessor = null; + if (videoEncoder) { + try { + videoEncoder.flush().catch(() => {}); + videoEncoder.close(); + } catch (e) {} + } + videoEncoder = null; + if (videoDecoder) { + try { + videoDecoder.close(); + } catch (e) {} + } + videoDecoder = null; + videoDecoderConfigured = false; + decoderConfigCache = null; + hasReceivedKeyframe = false; + if (videoWriter) { + try { + videoWriter.close(); + } catch (e) {} + } + videoWriter = null; + videoGenerator = null; + if (upstreamVideoChannel) { + upstreamVideoChannel.close(); + upstreamVideoChannel = null; + } + downstreamVideoChannels.forEach((dc) => dc.close()); + downstreamVideoChannels.clear(); + lastVideoChunkTs = 0n; +} + // --- 3. Health Reporting (Unchanged) --- setInterval(calculateAndReportHealth, 5000); setInterval(pollPeerMetrics, 2000); async function calculateAndReportHealth() { + if (useDataChannelVideo) { + socket.emit('update_score', 100); + return; + } if (localStream) { socket.emit('update_score', 100); return; @@ -245,11 +626,22 @@ async function handleUpstreamOffer(senderId, sdp) { } }, 15000); + newPC.ondatachannel = (event) => { + if (event.channel.label === 'video') { + console.log('Upstream video datachannel attached'); + setupUpstreamVideoChannel(event.channel); + } + }; + newPC.ontrack = async (event) => { console.log('Received track from upstream:', event); clearTimeout(safetyTimer); // Success! if (event.track.kind === 'video') { + if (useDataChannelVideo) { + console.log('Ignoring incoming video track; datachannel video active'); + return; + } origVideoStream = event.streams[0]; displayedVideoStream = origVideoStream.clone(); sentVideoStream = origVideoStream.clone(); @@ -271,14 +663,25 @@ async function handleUpstreamOffer(senderId, sdp) { const displayedStream = new MediaStream(); if (displayedVideoTrack) displayedStream.addTrack(displayedVideoTrack); if (displayedAudioTrack) displayedStream.addTrack(displayedAudioTrack); - - remoteVideo.srcObject = displayedStream; + if (!useDataChannelVideo) { + remoteVideo.srcObject = displayedStream; + } else if (displayedAudioTrack) { + if (!remoteVideo.srcObject) { + remoteVideo.srcObject = new MediaStream([displayedAudioTrack]); + } else { + try { + remoteVideo.srcObject.addTrack(displayedAudioTrack); + } catch (e) { + remoteVideo.srcObject = new MediaStream([displayedAudioTrack]); + } + } + } statusDiv.innerText = `Status: Connected | ID: ${socket.id}`; if (currentDownstreamPC) { console.log('Relaying new upstream stream to downstream'); const videoSender = currentDownstreamPC.getSenders().find((s) => s.track && s.track.kind === 'video'); - const audioSender = currentDownstreamPC.getSenders().find((s) => s.track && s.track.kind === 'video'); + const audioSender = currentDownstreamPC.getSenders().find((s) => s.track && s.track.kind === 'audio'); if (videoSender) await videoSender.replaceTrack(sentVideoTrack); if (audioSender) await audioSender.replaceTrack(sentAudioTrack); } @@ -316,20 +719,30 @@ async function setupDownstreamConnection(targetId) { currentDownstreamPC = new RTCPeerConnection({ iceServers }); + if (useDataChannelVideo) { + const videoDC = currentDownstreamPC.createDataChannel('video'); + setupDownstreamVideoChannel(targetId, videoDC); + } + if (localStream) { console.log('Sending local stream tracks to downstream'); - localStream.getTracks().forEach((track) => currentDownstreamPC.addTrack(track, localStream)); + localStream.getAudioTracks().forEach((track) => currentDownstreamPC.addTrack(track.clone(), localStream)); + if (!useDataChannelVideo) { + localStream.getVideoTracks().forEach((track) => currentDownstreamPC.addTrack(track.clone(), localStream)); + } } else if (currentUpstreamPC) { console.log('Relaying upstream stream tracks to downstream'); - // currentDownstreamPC.addTrack(sentVideoTrack, sentVideoStream); - - currentUpstreamPC.getReceivers().map((receiver) => { - console.log('Receiver track:', receiver.track); + currentUpstreamPC.getReceivers().forEach((receiver) => { if (!receiver.track) return; - - const sentTrack = receiver.track.kind === 'video' ? sentVideoTrack : sentAudioTrack; - const sentStream = receiver.track.kind === 'video' ? sentVideoStream : sentAudioStream; - currentDownstreamPC.addTrack(sentTrack, sentStream); + if (receiver.track.kind === 'audio') { + const cloned = receiver.track.clone(); + const stream = new MediaStream([cloned]); + currentDownstreamPC.addTrack(cloned, stream); + } else if (!useDataChannelVideo) { + const sentTrack = sentVideoTrack || receiver.track.clone(); + const sentStream = sentVideoStream || new MediaStream([sentTrack]); + currentDownstreamPC.addTrack(sentTrack, sentStream); + } }); } @@ -338,18 +751,21 @@ async function setupDownstreamConnection(targetId) { }; await Promise.all( - currentDownstreamPC.getSenders().map(async (sender) => { - const params = sender.getParameters(); - params.encodings = params.encodings.map((enc) => { - enc.maxBitrate = 200_000_000; - enc.maxFramerate = 60; - enc.scaleResolutionDownBy = 1.0; - enc.priority = 'high'; - return enc; - }); - params.degradationPreference = 'maintain-resolution'; - await sender.setParameters(params); - }) + currentDownstreamPC + .getSenders() + .filter((sender) => sender.track && sender.track.kind === 'video') + .map(async (sender) => { + const params = sender.getParameters(); + params.encodings = params.encodings.map((enc) => { + enc.maxBitrate = 200_000_000; + enc.maxFramerate = 60; + enc.scaleResolutionDownBy = 1.0; + enc.priority = 'high'; + return enc; + }); + params.degradationPreference = 'maintain-resolution'; + await sender.setParameters(params); + }) ); const offer = await currentDownstreamPC.createOffer(); @@ -364,31 +780,37 @@ async function setupDownstreamConnection(targetId) { async function pollPeerMetrics() { try { - const upstreamResult = currentUpstreamPC - ? await collectInboundMetrics(currentUpstreamPC, metricsHistory.upstream) - : null; - const downstreamResult = currentDownstreamPC - ? await collectOutboundMetrics(currentDownstreamPC, metricsHistory.downstream) - : null; + if (useDataChannelVideo) { + const dcDisplay = computeDataChannelMetrics(); + renderDataChannelMetrics(dcDisplay); + socket.emit('report_metrics', { inbound: dcDisplay.inbound, outbound: dcDisplay.outbound }); + } else { + const upstreamResult = currentUpstreamPC + ? await collectInboundMetrics(currentUpstreamPC, metricsHistory.upstream) + : null; + const downstreamResult = currentDownstreamPC + ? await collectOutboundMetrics(currentDownstreamPC, metricsHistory.downstream) + : null; - metricsHistory.upstream = upstreamResult - ? upstreamResult.snapshot - : currentUpstreamPC - ? metricsHistory.upstream - : null; - metricsHistory.downstream = downstreamResult - ? downstreamResult.snapshot - : currentDownstreamPC - ? metricsHistory.downstream - : null; + metricsHistory.upstream = upstreamResult + ? upstreamResult.snapshot + : currentUpstreamPC + ? metricsHistory.upstream + : null; + metricsHistory.downstream = downstreamResult + ? downstreamResult.snapshot + : currentDownstreamPC + ? metricsHistory.downstream + : null; - renderMetrics(upstreamResult ? upstreamResult.display : null, downstreamResult ? downstreamResult.display : null); + renderMetrics(upstreamResult ? upstreamResult.display : null, downstreamResult ? downstreamResult.display : null); - // Stream metrics to the monitor dashboard - socket.emit('report_metrics', { - inbound: upstreamResult ? upstreamResult.display : null, - outbound: downstreamResult ? downstreamResult.display : null, - }); + // Stream metrics to the monitor dashboard + socket.emit('report_metrics', { + inbound: upstreamResult ? upstreamResult.display : null, + outbound: downstreamResult ? downstreamResult.display : null, + }); + } } catch (err) { console.warn('Metrics poll failed', err); } @@ -524,6 +946,9 @@ async function collectOutboundMetrics(pc, previous) { } function renderMetrics(inboundDisplay, outboundDisplay) { + if (useDataChannelVideo) { + return; + } if (!currentUpstreamPC) { upstreamMetricsBox.innerHTML = 'No upstream peer (head broadcaster).'; } else if (!inboundDisplay) { @@ -595,3 +1020,64 @@ function formatTriple(a, b, c) { const pc = Number.isFinite(c) ? c : '-'; return `${pa}/${pb}/${pc}`; } + +function computeDataChannelMetrics() { + const now = performance.now(); + const v = dcStats.video; + if (v.lastSampleTs === null) { + v.lastSampleTs = now; + v.lastTxBytes = v.txBytes; + v.lastRxBytes = v.rxBytes; + v.lastTxFrames = v.txFrames; + v.lastRxFrames = v.rxFrames; + } + const deltaMs = now - v.lastSampleTs; + if (deltaMs > 0) { + const txBytesDelta = v.txBytes - v.lastTxBytes; + const rxBytesDelta = v.rxBytes - v.lastRxBytes; + const txFramesDelta = v.txFrames - v.lastTxFrames; + const rxFramesDelta = v.rxFrames - v.lastRxFrames; + v.bitrateKbpsTx = txBytesDelta > 0 ? (txBytesDelta * 8) / deltaMs : 0; + v.bitrateKbpsRx = rxBytesDelta > 0 ? (rxBytesDelta * 8) / deltaMs : 0; + v.fpsTx = txFramesDelta > 0 ? (txFramesDelta * 1000) / deltaMs : 0; + v.fpsRx = rxFramesDelta > 0 ? (rxFramesDelta * 1000) / deltaMs : 0; + v.lastSampleTs = now; + v.lastTxBytes = v.txBytes; + v.lastRxBytes = v.rxBytes; + v.lastTxFrames = v.txFrames; + v.lastRxFrames = v.rxFrames; + } + return { + inbound: { + bitrateKbps: v.bitrateKbpsRx, + fps: v.fpsRx, + resolution: decoderConfigCache ? `${decoderConfigCache.codedWidth}x${decoderConfigCache.codedHeight}` : null, + state: 'datachannel', + bytes: v.rxBytes, + frames: v.rxFrames, + }, + outbound: { + bitrateKbps: v.bitrateKbpsTx, + fps: v.fpsTx, + resolution: decoderConfigCache ? `${decoderConfigCache.codedWidth}x${decoderConfigCache.codedHeight}` : null, + state: 'datachannel', + bytes: v.txBytes, + frames: v.txFrames, + }, + }; +} + +function renderDataChannelMetrics(display) { + upstreamMetricsBox.innerHTML = metricsLines([ + ['Path', 'DataChannel'], + ['Bitrate RX', formatBitrate(display.inbound.bitrateKbps)], + ['FPS RX', formatNumber(display.inbound.fps)], + ['Resolution', display.inbound.resolution || '--'], + ]); + downstreamMetricsBox.innerHTML = metricsLines([ + ['Path', 'DataChannel'], + ['Bitrate TX', formatBitrate(display.outbound.bitrateKbps)], + ['FPS TX', formatNumber(display.outbound.fps)], + ['Resolution', display.outbound.resolution || '--'], + ]); +} diff --git a/public/monitor.js b/public/monitor.js index 5c9d7ce..f6c163f 100644 --- a/public/monitor.js +++ b/public/monitor.js @@ -149,8 +149,8 @@ function updateChainVisual(visualContainer, chain, scores, metrics) { function renderMetricsLines(inbound, outbound) { const rows = []; - rows.push(metricRow('Inbound', inbound ? formatMetricLines(inbound, true) : ['—'])); - rows.push(metricRow('Outbound', outbound ? formatMetricLines(outbound, false) : ['—'])); + rows.push(metricRow('Inbound', formatMetricLines(inbound, true))); + rows.push(metricRow('Outbound', formatMetricLines(outbound, false))); return `