require('dotenv').config(); const express = require('express'); const http = require('http'); const { Server } = require('socket.io'); const axios = require('axios'); const app = express(); const server = http.createServer(app); const io = new Server(server); app.use(express.static('public')); // --- Cloudflare Credentials --- app.get('/api/get-turn-credentials', async (req, res) => { try { // const response = await axios.post( // `https://rtc.live.cloudflare.com/v1/turn/keys/${process.env.CLOUDFLARE_APP_ID}/credentials/generate-ice-servers`, // { ttl: 86400 }, // { headers: { 'Authorization': `Bearer ${process.env.CLOUDFLARE_API_TOKEN}`, 'Content-Type': 'application/json' } } // ); const response = await fetch(`https://strandcast.benglsoftware.com/api/get-turn-credentials`); res.json(await response.json()); } catch (error) { console.error('Cloudflare Error:', error.message); res.json({ iceServers: [{ urls: 'stun:stun.l.google.com:19302' }] }); } }); // --- STATE MANAGEMENT --- // Format: { "StreamName": [socketId1, socketId2, ...] } const streams = {}; const socketStreamMap = {}; // Helper: socketId -> StreamName (for fast lookups on disconnect) const scores = {}; // socketId -> Health Score (0-100) const metrics = {}; // socketId -> { inbound, outbound, ts } io.on('connection', (socket) => { console.log(`User connected: ${socket.id}`); scores[socket.id] = 80; // 0. Send the list of streams to the new user socket.emit('stream_list_update', Object.keys(streams)); // A. Start New Stream socket.on('start_stream', (streamName) => { if (streams[streamName]) { socket.emit('error_msg', 'Stream name already exists.'); return; } if (socketStreamMap[socket.id]) { socket.emit('error_msg', 'You are already in a stream.'); return; } streams[streamName] = [socket.id]; socketStreamMap[socket.id] = streamName; console.log(`Stream started: ${streamName} by ${socket.id}`); socket.emit('role_assigned', 'head'); // Broadcast new list to everyone in the lobby io.emit('stream_list_update', Object.keys(streams)); }); // B. Join Existing Stream socket.on('join_stream', (streamName) => { const chain = streams[streamName]; if (!chain) { socket.emit('error_msg', 'Stream does not exist.'); return; } const upstreamPeerId = chain[chain.length - 1]; chain.push(socket.id); socketStreamMap[socket.id] = streamName; console.log(`User ${socket.id} joined stream "${streamName}". Upstream: ${upstreamPeerId}`); // Tell Upstream to connect to ME io.to(upstreamPeerId).emit('connect_to_downstream', { downstreamId: socket.id }); // Request Keyframe (Targeting the Head of THIS stream) setTimeout(() => { if (streams[streamName] && streams[streamName][0]) { io.to(streams[streamName][0]).emit('request_keyframe'); } }, 2000); }); // C. Signaling socket.on('signal_msg', ({ target, type, sdp, candidate }) => { io.to(target).emit('signal_msg', { sender: socket.id, type, sdp, candidate }); }); // D. Score Updates socket.on('update_score', (score) => { scores[socket.id] = score; }); // D2. Metrics Updates socket.on('report_metrics', (payload) => { metrics[socket.id] = { ...payload, ts: Date.now() }; }); // E. Keyframe Relay socket.on('relay_keyframe_upstream', () => { const streamName = socketStreamMap[socket.id]; if (streamName && streams[streamName]) { io.to(streams[streamName][0]).emit('request_keyframe'); } }); // F. Disconnects (Healing) socket.on('disconnect', () => { delete scores[socket.id]; delete metrics[socket.id]; const streamName = socketStreamMap[socket.id]; if (!streamName) return; // User wasn't in a stream const chain = streams[streamName]; if (!chain) return; const index = chain.indexOf(socket.id); if (index === -1) return; console.log(`User ${socket.id} left stream "${streamName}". Index: ${index}`); // Case 1: Head Left -> Destroy Stream if (index === 0) { delete streams[streamName]; // Notify everyone in this stream that it ended chain.forEach((peerId) => { if (peerId !== socket.id) io.to(peerId).emit('stream_ended'); delete socketStreamMap[peerId]; }); io.emit('stream_list_update', Object.keys(streams)); // Update Lobby return; } // Case 2: Tail Left -> Just pop if (index === chain.length - 1) { chain.pop(); const newTail = chain[chain.length - 1]; if (newTail) io.to(newTail).emit('disconnect_downstream'); } // Case 3: Middle Left -> Stitch else { const upstreamNode = chain[index - 1]; const downstreamNode = chain[index + 1]; chain.splice(index, 1); // Remove node console.log(`Healing "${streamName}": ${upstreamNode} -> ${downstreamNode}`); io.to(upstreamNode).emit('connect_to_downstream', { downstreamId: downstreamNode }); setTimeout(() => { if (streams[streamName] && streams[streamName][0]) { io.to(streams[streamName][0]).emit('request_keyframe'); } }, 2000); } delete socketStreamMap[socket.id]; }); socket.on('join_monitor', () => { socket.join('monitors'); // Send immediate state socket.emit('monitor_update', { streams, scores, metrics }); }); }); // 2. Broadcast State to Monitors (1Hz) // We send the full topology and health scores every second. setInterval(() => { // Only emit if there is someone watching to save bandwidth const monitorRoom = io.sockets.adapter.rooms.get('monitors'); if (monitorRoom && monitorRoom.size > 0) { io.to('monitors').emit('monitor_update', { streams, scores, metrics }); } }, 1000); // --- THE MULTI-STREAM OPTIMIZER --- setInterval(() => { // Loop through EVERY active stream Object.keys(streams).forEach((streamName) => { const chain = streams[streamName]; if (chain.length < 3) return; // Scan this specific chain for (let i = 1; i < chain.length - 1; i++) { const current = chain[i]; const next = chain[i + 1]; const scoreCurrent = scores[current] || 0; const scoreNext = scores[next] || 0; if (scoreNext > scoreCurrent + 15) { console.log(`[${streamName}] Swapping ${current} (${scoreCurrent}) with ${next} (${scoreNext})`); performSwap(chain, i, i + 1); break; // One swap per stream per cycle } } }); }, 10000); function performSwap(chain, indexA, indexB) { const nodeA = chain[indexA]; const nodeB = chain[indexB]; chain[indexA] = nodeB; chain[indexB] = nodeA; const upstreamNode = chain[indexA - 1]; const downstreamNode = chain[indexB + 1]; if (upstreamNode) io.to(upstreamNode).emit('connect_to_downstream', { downstreamId: nodeB }); io.to(nodeB).emit('connect_to_downstream', { downstreamId: nodeA }); if (downstreamNode) { io.to(nodeA).emit('connect_to_downstream', { downstreamId: downstreamNode }); } else { io.to(nodeA).emit('disconnect_downstream'); } // Request Keyframe from Head setTimeout(() => { if (chain[0]) io.to(chain[0]).emit('request_keyframe'); }, 1000); setTimeout(() => { if (chain[0]) io.to(chain[0]).emit('request_keyframe'); }, 4000); } const PORT = process.env.PORT || 3000; server.listen(PORT, () => console.log(`Server running on http://localhost:${PORT}`));