diff --git a/public/client.js b/public/client.js index e81a40b..9e7e817 100644 --- a/public/client.js +++ b/public/client.js @@ -1,193 +1,267 @@ const socket = io(); -let localStream; -let peerConnection; // For Viewers: connection to the streamer -let viewerConnections = {}; // For Streamers: map of { socketId: RTCPeerConnection } -let cloudflareIceServers = []; +const localVideo = document.getElementById('localVideo'); +const remoteVideo = document.getElementById('remoteVideo'); +const statusDiv = document.getElementById('status'); -const videoDisplay = document.getElementById('videoDisplay'); -const placeholder = document.getElementById('videoPlaceholder'); -const streamListDiv = document.getElementById('streamList'); -const goLiveBtn = document.getElementById('goLiveBtn'); -const stopLiveBtn = document.getElementById('stopLiveBtn'); +// --- Global State --- +let localStream = null; +let iceServers = []; -// 1. Initialize on Load (Fetch Keys & Listen for streams) -(async function init() { +// ACTIVE connections +let currentUpstreamPC = null; +let currentDownstreamPC = null; +let downstreamId = null; // Socket ID of who we are sending to + +// FADING connections (Garbage bin for smooth transitions) +let oldUpstreamPCs = []; +let oldDownstreamPCs = []; + +// --- 1. Initialization --- +async function init() { try { - const res = await fetch('/api/get-turn-credentials'); - const data = await res.json(); - cloudflareIceServers = data.iceServers; - console.log("ICE Servers loaded."); + const response = await fetch('/api/get-turn-credentials'); + const data = await response.json(); + iceServers = data.iceServers; + console.log("Loaded ICE Servers"); } catch (e) { - console.error("Failed to load ICE servers:", e); + console.error("Using public STUN"); + iceServers = [{ urls: 'stun:stun.l.google.com:19302' }]; } -})(); +} +init(); -// --- VIEWER LOGIC (Default) --- +// --- 2. User Actions --- +async function startStream() { + try { + localStream = await navigator.mediaDevices.getUserMedia({ video: true, audio: true }); + localVideo.srcObject = localStream; + remoteVideo.style.display = 'none'; // Head doesn't need remote view -// Listen for updates to the list of available streams -socket.on('streamer_list_update', (streamers) => { - streamListDiv.innerHTML = ''; - const ids = Object.keys(streamers); + socket.emit('start_stream'); + statusDiv.innerText = "Status: Broadcasting (Head)"; + } catch (err) { + console.error("Error accessing media:", err); + alert("Could not access camera."); + } +} - if (ids.length === 0) { - streamListDiv.innerHTML = '

No active streams.

'; +function joinStream() { + socket.emit('join_stream'); + localVideo.style.display = 'none'; // Nodes don't see themselves + statusDiv.innerText = "Status: Joining chain..."; +} + +// --- 3. Health Reporting (The "Bubble Sort" Logic) --- +setInterval(calculateAndReportHealth, 5000); + +async function calculateAndReportHealth() { + // If I am Head, I am perfect. + if (localStream) { + socket.emit('update_score', 100); return; } - ids.forEach(id => { - if (id === socket.id) return; // Don't list myself + // If I'm not connected, I'm waiting. + if (!currentUpstreamPC) return; - const div = document.createElement('div'); - div.className = 'stream-item'; - div.innerHTML = ` - ${streamers[id].name} - - `; - streamListDiv.appendChild(div); - }); + try { + const stats = await currentUpstreamPC.getStats(); + let packetsLost = 0; + let packetsReceived = 0; + let jitter = 0; + + stats.forEach(report => { + if (report.type === 'inbound-rtp' && report.kind === 'video') { + packetsLost = report.packetsLost || 0; + packetsReceived = report.packetsReceived || 0; + jitter = report.jitter || 0; + } + }); + + const totalPackets = packetsReceived + packetsLost; + if (totalPackets === 0) return; + + // Calculate Score (0-100) + // Heavy penalty for loss, mild for jitter + const lossRate = packetsLost / totalPackets; + const score = 100 - (lossRate * 100 * 5) - (jitter * 100); + + const finalScore = Math.max(0, Math.min(100, score)); + console.log(`Health: ${finalScore.toFixed(0)} | Loss: ${(lossRate * 100).toFixed(2)}%`); + + socket.emit('update_score', finalScore); + + } catch (e) { + // console.error("Stats error", e); + } +} + +// --- 4. Socket Events --- + +// Instruction: Connect to a new node downstream +socket.on('connect_to_downstream', async ({ downstreamId: targetId }) => { + console.log(`Instruction: Connect downstream to ${targetId}`); + downstreamId = targetId; + await setupDownstreamConnection(targetId); }); -// User clicked "Watch" on a stream -async function watchStream(streamerId) { - if (localStream) { - alert("You cannot watch while streaming!"); - return; +// Instruction: Stop sending downstream (I am now the tail) +socket.on('disconnect_downstream', () => { + console.log("Instruction: Disconnect downstream (Became Tail)"); + if (currentDownstreamPC) { + currentDownstreamPC.close(); + currentDownstreamPC = null; + downstreamId = null; + } + // Also clean up garbage bin + oldDownstreamPCs.forEach(pc => pc.close()); + oldDownstreamPCs = []; +}); + +// Signaling Router +socket.on('signal_msg', async ({ sender, type, sdp, candidate }) => { + // A. OFFERS (Always come from Upstream) + if (type === 'offer') { + await handleUpstreamOffer(sender, sdp); } - // UI Updates - placeholder.classList.add('hidden'); - videoDisplay.classList.remove('hidden'); - videoDisplay.muted = false; // Unmute for viewer - - // Tell server we want to join - console.log("Requesting to join stream:", streamerId); - socket.emit('join_stream', streamerId); - - // Prepare to receive the Offer - socket.off('webrtc_offer'); // Clean previous listeners - socket.on('webrtc_offer', async ({ sdp, sender }) => { - if (sender !== streamerId) return; - - console.log("Received Offer from streamer."); - peerConnection = new RTCPeerConnection({ iceServers: cloudflareIceServers }); - - // When we get tracks (video/audio), show them - peerConnection.ontrack = (event) => { - console.log("Track received"); - videoDisplay.srcObject = event.streams[0]; - }; - - // Handle ICE candidates to send back to streamer - peerConnection.onicecandidate = (event) => { - if (event.candidate) { - socket.emit('ice_candidate', { target: sender, candidate: event.candidate }); - } - }; - - await peerConnection.setRemoteDescription(new RTCSessionDescription(sdp)); - const answer = await peerConnection.createAnswer(); - await peerConnection.setLocalDescription(answer); - - socket.emit('webrtc_answer', { target: sender, sdp: answer }); - }); - - // Handle ICE candidates from streamer - socket.on('ice_candidate', async ({ candidate, sender }) => { - if (peerConnection && sender === streamerId) { - await peerConnection.addIceCandidate(new RTCIceCandidate(candidate)); + // B. ANSWERS (Always come from Downstream) + else if (type === 'answer') { + if (currentDownstreamPC && sender === downstreamId) { + await currentDownstreamPC.setRemoteDescription(new RTCSessionDescription(sdp)); } - }); -} - - -// --- STREAMER LOGIC --- - -async function promptForStream() { - const name = prompt("Enter a name for your stream:", "My Cool Stream"); - if (!name) return; - - startStreaming(name); -} - -async function startStreaming(streamName) { - try { - // 1. Get User Media - localStream = await navigator.mediaDevices.getUserMedia({ video: true, audio: true }); - - // 2. Update UI - videoDisplay.srcObject = localStream; - videoDisplay.classList.remove('hidden'); - placeholder.classList.add('hidden'); - videoDisplay.muted = true; // Mute self to avoid echo - goLiveBtn.classList.add('hidden'); - stopLiveBtn.classList.remove('hidden'); - - // 3. Announce to Server - socket.emit('start_stream', streamName); - console.log("Stream announced:", streamName); - - // 4. Handle Incoming Viewers - socket.on('viewer_joined', async ({ viewerId }) => { - console.log("New viewer joined:", viewerId); - createStreamerConnection(viewerId); - }); - - // 5. Handle Answers from Viewers - socket.on('webrtc_answer', async ({ sdp, sender }) => { - const pc = viewerConnections[sender]; - if (pc) { - await pc.setRemoteDescription(new RTCSessionDescription(sdp)); - } - }); - - // 6. Handle ICE Candidates from Viewers - socket.on('ice_candidate', async ({ candidate, sender }) => { - const pc = viewerConnections[sender]; - if (pc) { - await pc.addIceCandidate(new RTCIceCandidate(candidate)); - } - }); - - // NEW: Handle instant disconnects - socket.on('viewer_left', ({ viewerId }) => { - console.log(`Viewer ${viewerId} left via socket disconnect.`); - const pc = viewerConnections[viewerId]; - if (pc) { - pc.close(); // Stop sending data immediately - delete viewerConnections[viewerId]; - } - }); - - } catch (err) { - console.error("Error starting stream:", err); - alert("Could not access camera/microphone."); } -} -function createStreamerConnection(viewerId) { - const pc = new RTCPeerConnection({ iceServers: cloudflareIceServers }); - viewerConnections[viewerId] = pc; + // C. CANDIDATES (Could be for anyone) + else if (type === 'candidate') { + const ice = new RTCIceCandidate(candidate); - // Add local stream tracks to this connection - localStream.getTracks().forEach(track => pc.addTrack(track, localStream)); + // Try Active Downstream + if (currentDownstreamPC && sender === downstreamId) { + currentDownstreamPC.addIceCandidate(ice).catch(e => { }); + } + // Try Active Upstream + if (currentUpstreamPC) { + currentUpstreamPC.addIceCandidate(ice).catch(e => { }); + } + // Try Fading Connections (Crucial for smooth swap!) + oldUpstreamPCs.forEach(pc => pc.addIceCandidate(ice).catch(e => { })); + oldDownstreamPCs.forEach(pc => pc.addIceCandidate(ice).catch(e => { })); + } +}); - // Send ICE candidates to this specific viewer - pc.onicecandidate = (event) => { +socket.on('error_msg', (msg) => alert(msg)); +socket.on('request_keyframe', () => console.log("Keyframe requested by network")); +socket.on('stream_ended', () => { + alert("Stream ended"); + location.reload(); +}); + +// --- 5. WebRTC Logic (Make-Before-Break) --- + +// --- UPSTREAM Handling (Receiving) --- +async function handleUpstreamOffer(senderId, sdp) { + console.log(`Negotiating new Upstream connection from ${senderId}`); + + const newPC = new RTCPeerConnection({ iceServers }); + + newPC.ontrack = (event) => { + console.log(`New Upstream Track (${event.track.kind}) Active.`); + + // A. Update Screen (Idempotent: doing this twice is fine) + remoteVideo.srcObject = event.streams[0]; + statusDiv.innerText = "Status: Connected (Stable)"; + + // B. Hot-Swap the Relay + if (currentDownstreamPC) { + const sender = currentDownstreamPC.getSenders().find(s => s.track && s.track.kind === event.track.kind); + if (sender) { + sender.replaceTrack(event.track); + } + } + + // C. Retire the OLD connection (THE FIX IS HERE) + // We only queue the old PC if it exists AND it isn't the one we just created. + if (currentUpstreamPC && currentUpstreamPC !== newPC) { + const oldPC = currentUpstreamPC; + oldUpstreamPCs.push(oldPC); + + console.log("Queueing old upstream connection for death in 4s..."); + setTimeout(() => { + oldPC.close(); + // Remove from garbage bin + oldUpstreamPCs = oldUpstreamPCs.filter(pc => pc !== oldPC); + console.log("Closed old upstream connection."); + }, 4000); + } + + // D. Set New as Current + currentUpstreamPC = newPC; + }; + + newPC.onicecandidate = (event) => { if (event.candidate) { - socket.emit('ice_candidate', { target: viewerId, candidate: event.candidate }); + socket.emit('signal_msg', { target: senderId, type: 'candidate', candidate: event.candidate }); } }; - // Create Offer - pc.createOffer().then(offer => { - pc.setLocalDescription(offer); - socket.emit('webrtc_offer', { target: viewerId, sdp: offer }); - }); + await newPC.setRemoteDescription(new RTCSessionDescription(sdp)); + const answer = await newPC.createAnswer(); + await newPC.setLocalDescription(answer); - // Cleanup on disconnect - pc.onconnectionstatechange = () => { - if (pc.connectionState === 'disconnected' || pc.connectionState === 'failed') { - delete viewerConnections[viewerId]; + socket.emit('signal_msg', { target: senderId, type: 'answer', sdp: answer }); +} + +// --- DOWNSTREAM Handling (Sending) --- +async function setupDownstreamConnection(targetId) { + console.log(`Setting up downstream to ${targetId}`); + + // 1. Retire existing downstream (Fade Out) + if (currentDownstreamPC) { + console.log("Moving current downstream to background (Overlap Mode)"); + const oldPC = currentDownstreamPC; + oldDownstreamPCs.push(oldPC); + + // Keep sending for 5 seconds so they have time to connect to their NEW source + setTimeout(() => { + oldPC.close(); + oldDownstreamPCs = oldDownstreamPCs.filter(pc => pc !== oldPC); + console.log("Closed old downstream connection"); + }, 5000); + } + + // 2. Create NEW downstream + currentDownstreamPC = new RTCPeerConnection({ iceServers }); + + // 3. Add Tracks + if (localStream) { + // Head: Send Camera + localStream.getTracks().forEach(track => currentDownstreamPC.addTrack(track, localStream)); + } else if (currentUpstreamPC) { + // Relay: Send what we are receiving + currentUpstreamPC.getReceivers().forEach(receiver => { + if (receiver.track) { + // "remoteVideo.srcObject" ensures stream ID consistency + currentDownstreamPC.addTrack(receiver.track, remoteVideo.srcObject); + } + }); + } + + currentDownstreamPC.onicecandidate = (event) => { + if (event.candidate) { + socket.emit('signal_msg', { target: targetId, type: 'candidate', candidate: event.candidate }); } }; + + const offer = await currentDownstreamPC.createOffer(); + + // 4. BITRATE HACK: Force 4Mbps limit (Standard WebRTC defaults low) + offer.sdp = offer.sdp.replace(/b=AS:([0-9]+)/g, 'b=AS:4000'); + if (!offer.sdp.includes('b=AS:')) { + // If no bandwidth line exists, add it to the video section + offer.sdp = offer.sdp.replace(/(m=video.*\r\n)/, '$1b=AS:4000\r\n'); + } + + await currentDownstreamPC.setLocalDescription(offer); + socket.emit('signal_msg', { target: targetId, type: 'offer', sdp: offer }); } \ No newline at end of file diff --git a/public/index.html b/public/index.html index d5f6e61..961bc94 100644 --- a/public/index.html +++ b/public/index.html @@ -3,138 +3,50 @@ - Live Stream Platform + + P2P Daisy Chain Livestream - -
- -
-
Select a stream to watch
- -
+

Daisy Chain Relay

+
+ +
+ + +
Status: Disconnected
diff --git a/server.js b/server.js index 5e85deb..805144d 100644 --- a/server.js +++ b/server.js @@ -20,62 +20,168 @@ app.get('/api/get-turn-credentials', async (req, res) => { ); res.json(response.data); } catch (error) { - console.error("Cloudflare error:", error.message); + console.error("Cloudflare Error:", error.message); res.status(500).json({ error: "Failed to fetch credentials" }); } }); -// --- Signaling Logic --- -let streamers = {}; // socketId -> { name: "Gaming Stream" } -// Add a map to track relationships: viewerSocketId -> streamerSocketId -let relationships = {}; +// --- Daisy Chain Logic --- +let strand = []; // Ordered list of socket IDs +let scores = {}; // Map: socket.id -> score (0-100) io.on('connection', (socket) => { - // 1. Immediately send the list of active streams to the new user - socket.emit('streamer_list_update', streamers); + console.log(`User connected: ${socket.id}`); - // 2. User wants to go live - socket.on('start_stream', (name) => { - streamers[socket.id] = { name: name }; - io.emit('streamer_list_update', streamers); // Notify everyone + // Default score for new users (optimistic, so they get a chance to prove themselves) + scores[socket.id] = 80; + + socket.on('update_score', (score) => { + scores[socket.id] = score; }); - // 3. User wants to watch a specific streamer - socket.on('join_stream', (streamerId) => { - relationships[socket.id] = streamerId; // Record the link - io.to(streamerId).emit('viewer_joined', { viewerId: socket.id }); + // A. Start Stream (Head) + socket.on('start_stream', () => { + if (strand.length > 0) { + socket.emit('error_msg', "Stream already in progress."); + return; + } + strand.push(socket.id); + console.log("Stream started. Head:", socket.id); + socket.emit('role_assigned', 'head'); }); - // 4. WebRTC Signaling (Offer/Answer/ICE) - socket.on('webrtc_offer', (data) => { - io.to(data.target).emit('webrtc_offer', { sdp: data.sdp, sender: socket.id }); + // B. Join Stream (Tail) + socket.on('join_stream', () => { + if (strand.length === 0) { + socket.emit('error_msg', "No active stream to join."); + return; + } + + const upstreamPeerId = strand[strand.length - 1]; + strand.push(socket.id); + + console.log(`User ${socket.id} joined. Upstream: ${upstreamPeerId}`); + + // 1. Tell Upstream to connect to ME + io.to(upstreamPeerId).emit('connect_to_downstream', { downstreamId: socket.id }); + + // 2. Request Keyframe from Head (Delayed to allow connection setup) + setTimeout(() => { + if (strand[0]) io.to(strand[0]).emit('request_keyframe'); + }, 2000); }); - socket.on('webrtc_answer', (data) => { - io.to(data.target).emit('webrtc_answer', { sdp: data.sdp, sender: socket.id }); + // C. Signaling + socket.on('signal_msg', ({ target, type, sdp, candidate }) => { + io.to(target).emit('signal_msg', { sender: socket.id, type, sdp, candidate }); }); - socket.on('ice_candidate', (data) => { - io.to(data.target).emit('ice_candidate', { candidate: data.candidate, sender: socket.id }); + // D. Keyframe Relay + socket.on('relay_keyframe_upstream', () => { + // In a real robust app, you'd bubble this up the chain. + // Here we just blast the Source directly if we know who they are. + if (strand[0]) io.to(strand[0]).emit('request_keyframe'); }); - // 5. Cleanup + // E. Disconnects (Healing) socket.on('disconnect', () => { - // 1. If a STREAMER left (existing code) - if (streamers[socket.id]) { - delete streamers[socket.id]; - io.emit('streamer_list_update', streamers); + const index = strand.indexOf(socket.id); + if (index === -1) return; + + console.log(`User ${socket.id} disconnected. Index: ${index}`); + + // Head left + if (index === 0) { + strand = []; + io.emit('stream_ended'); + return; } - // 2. If a VIEWER left (NEW CODE) - if (relationships[socket.id]) { - const streamerId = relationships[socket.id]; - // Tell the specific streamer that this specific viewer is gone - io.to(streamerId).emit('viewer_left', { viewerId: socket.id }); - delete relationships[socket.id]; + // Tail left + if (index === strand.length - 1) { + strand.pop(); + return; } + + // Middle left (Stitch) + const upstreamNode = strand[index - 1]; + const downstreamNode = strand[index + 1]; + strand.splice(index, 1); + + console.log(`Healing: ${upstreamNode} -> ${downstreamNode}`); + io.to(upstreamNode).emit('connect_to_downstream', { downstreamId: downstreamNode }); + + setTimeout(() => { + if (strand[0]) io.to(strand[0]).emit('request_keyframe'); + }, 2000); }); }); +// --- THE OPTIMIZER LOOP --- +setInterval(() => { + if (strand.length < 3) return; // Need at least Head + 2 Nodes to swap anything + + // We iterate from the 2nd node (Index 1) to the end. + // Index 0 is HEAD (Source), we never move the source. + let swapped = false; + + // We look for ONE swap per cycle to minimize chaos. + for (let i = 1; i < strand.length - 1; i++) { + const current = strand[i]; + const next = strand[i + 1]; + + const scoreCurrent = scores[current] || 0; + const scoreNext = scores[next] || 0; + + // THRESHOLD: Only swap if the next guy is significantly stronger (+15 points) + // This prevents users flip-flopping constantly due to minor fluctuations. + if (scoreNext > scoreCurrent + 15) { + console.log(`Swapping Weak ${current} (${scoreCurrent}) with Strong ${next} (${scoreNext})`); + + performSwap(i, i + 1); + swapped = true; + break; // Stop after one swap to let network settle + } + } +}, 10000); // Run every 10 seconds + +function performSwap(indexA, indexB) { + // 1. Update the Array (Logical Swap) + const nodeA = strand[indexA]; + const nodeB = strand[indexB]; + strand[indexA] = nodeB; + strand[indexB] = nodeA; + + // State after swap: + // [ ... -> Upstream -> NodeB -> NodeA -> Downstream -> ... ] + + // Identify the relevant neighbors + const upstreamNode = strand[indexA - 1]; // The node before the pair + const downstreamNode = strand[indexB + 1]; // The node after the pair + + // 2. Instruct the Upstream to connect to the NEW first node (NodeB) + if (upstreamNode) { + io.to(upstreamNode).emit('connect_to_downstream', { downstreamId: nodeB }); + } + + // 3. Instruct NodeB to connect to NodeA (The internal link) + io.to(nodeB).emit('connect_to_downstream', { downstreamId: nodeA }); + + // 4. Instruct NodeA to connect to the Downstream (or null if end) + if (downstreamNode) { + io.to(nodeA).emit('connect_to_downstream', { downstreamId: downstreamNode }); + } else { + // If NodeA is now the tail, it disconnects its downstream + io.to(nodeA).emit('disconnect_downstream'); // You might need to add this handler to client + } + + // 5. Trigger Keyframes to heal the image + setTimeout(() => { + if (strand[0]) io.to(strand[0]).emit('request_keyframe'); + }, 1000); +} + + + const PORT = process.env.PORT || 3000; server.listen(PORT, () => console.log(`Server running on http://localhost:${PORT}`)); \ No newline at end of file