2025-12-10 21:15:04 -05:00

236 lines
7.3 KiB
JavaScript

require('dotenv').config();
const express = require('express');
const http = require('http');
const { Server } = require('socket.io');
const axios = require('axios');
const app = express();
const server = http.createServer(app);
const io = new Server(server);
app.use(express.static('public'));
// --- Cloudflare Credentials ---
app.get('/api/get-turn-credentials', async (req, res) => {
try {
// const response = await axios.post(
// `https://rtc.live.cloudflare.com/v1/turn/keys/${process.env.CLOUDFLARE_APP_ID}/credentials/generate-ice-servers`,
// { ttl: 86400 },
// { headers: { 'Authorization': `Bearer ${process.env.CLOUDFLARE_API_TOKEN}`, 'Content-Type': 'application/json' } }
// );
const response = await fetch(`https://strandcast.benglsoftware.com/api/get-turn-credentials`);
res.json(await response.json());
} catch (error) {
console.error('Cloudflare Error:', error.message);
res.json({ iceServers: [{ urls: 'stun:stun.l.google.com:19302' }] });
}
});
// --- STATE MANAGEMENT ---
// Format: { "StreamName": [socketId1, socketId2, ...] }
const streams = {};
const socketStreamMap = {}; // Helper: socketId -> StreamName (for fast lookups on disconnect)
const scores = {}; // socketId -> Health Score (0-100)
const metrics = {}; // socketId -> { inbound, outbound, ts }
io.on('connection', (socket) => {
console.log(`User connected: ${socket.id}`);
scores[socket.id] = 80;
// 0. Send the list of streams to the new user
socket.emit('stream_list_update', Object.keys(streams));
// A. Start New Stream
socket.on('start_stream', (streamName) => {
if (streams[streamName]) {
socket.emit('error_msg', 'Stream name already exists.');
return;
}
if (socketStreamMap[socket.id]) {
socket.emit('error_msg', 'You are already in a stream.');
return;
}
streams[streamName] = [socket.id];
socketStreamMap[socket.id] = streamName;
console.log(`Stream started: ${streamName} by ${socket.id}`);
socket.emit('role_assigned', 'head');
// Broadcast new list to everyone in the lobby
io.emit('stream_list_update', Object.keys(streams));
});
// B. Join Existing Stream
socket.on('join_stream', (streamName) => {
const chain = streams[streamName];
if (!chain) {
socket.emit('error_msg', 'Stream does not exist.');
return;
}
const upstreamPeerId = chain[chain.length - 1];
chain.push(socket.id);
socketStreamMap[socket.id] = streamName;
console.log(`User ${socket.id} joined stream "${streamName}". Upstream: ${upstreamPeerId}`);
// Tell Upstream to connect to ME
io.to(upstreamPeerId).emit('connect_to_downstream', { downstreamId: socket.id });
// Request Keyframe (Targeting the Head of THIS stream)
setTimeout(() => {
if (streams[streamName] && streams[streamName][0]) {
io.to(streams[streamName][0]).emit('request_keyframe');
}
}, 2000);
});
// C. Signaling
socket.on('signal_msg', ({ target, type, sdp, candidate }) => {
io.to(target).emit('signal_msg', { sender: socket.id, type, sdp, candidate });
});
// D. Score Updates
socket.on('update_score', (score) => {
scores[socket.id] = score;
});
// D2. Metrics Updates
socket.on('report_metrics', (payload) => {
metrics[socket.id] = { ...payload, ts: Date.now() };
});
// E. Keyframe Relay
socket.on('relay_keyframe_upstream', () => {
const streamName = socketStreamMap[socket.id];
if (streamName && streams[streamName]) {
io.to(streams[streamName][0]).emit('request_keyframe');
}
});
// F. Disconnects (Healing)
socket.on('disconnect', () => {
delete scores[socket.id];
delete metrics[socket.id];
const streamName = socketStreamMap[socket.id];
if (!streamName) return; // User wasn't in a stream
const chain = streams[streamName];
if (!chain) return;
const index = chain.indexOf(socket.id);
if (index === -1) return;
console.log(`User ${socket.id} left stream "${streamName}". Index: ${index}`);
// Case 1: Head Left -> Destroy Stream
if (index === 0) {
delete streams[streamName];
// Notify everyone in this stream that it ended
chain.forEach((peerId) => {
if (peerId !== socket.id) io.to(peerId).emit('stream_ended');
delete socketStreamMap[peerId];
});
io.emit('stream_list_update', Object.keys(streams)); // Update Lobby
return;
}
// Case 2: Tail Left -> Just pop
if (index === chain.length - 1) {
chain.pop();
const newTail = chain[chain.length - 1];
if (newTail) io.to(newTail).emit('disconnect_downstream');
}
// Case 3: Middle Left -> Stitch
else {
const upstreamNode = chain[index - 1];
const downstreamNode = chain[index + 1];
chain.splice(index, 1); // Remove node
console.log(`Healing "${streamName}": ${upstreamNode} -> ${downstreamNode}`);
io.to(upstreamNode).emit('connect_to_downstream', { downstreamId: downstreamNode });
setTimeout(() => {
if (streams[streamName] && streams[streamName][0]) {
io.to(streams[streamName][0]).emit('request_keyframe');
}
}, 2000);
}
delete socketStreamMap[socket.id];
});
socket.on('join_monitor', () => {
socket.join('monitors');
// Send immediate state
socket.emit('monitor_update', { streams, scores, metrics });
});
});
// 2. Broadcast State to Monitors (1Hz)
// We send the full topology and health scores every second.
setInterval(() => {
// Only emit if there is someone watching to save bandwidth
const monitorRoom = io.sockets.adapter.rooms.get('monitors');
if (monitorRoom && monitorRoom.size > 0) {
io.to('monitors').emit('monitor_update', { streams, scores, metrics });
}
}, 1000);
// --- THE MULTI-STREAM OPTIMIZER ---
setInterval(() => {
// Loop through EVERY active stream
Object.keys(streams).forEach((streamName) => {
const chain = streams[streamName];
if (chain.length < 3) return;
// Scan this specific chain
for (let i = 1; i < chain.length - 1; i++) {
const current = chain[i];
const next = chain[i + 1];
const scoreCurrent = scores[current] || 0;
const scoreNext = scores[next] || 0;
if (scoreNext > scoreCurrent + 15) {
console.log(`[${streamName}] Swapping ${current} (${scoreCurrent}) with ${next} (${scoreNext})`);
performSwap(chain, i, i + 1);
break; // One swap per stream per cycle
}
}
});
}, 10000);
function performSwap(chain, indexA, indexB) {
const nodeA = chain[indexA];
const nodeB = chain[indexB];
chain[indexA] = nodeB;
chain[indexB] = nodeA;
const upstreamNode = chain[indexA - 1];
const downstreamNode = chain[indexB + 1];
if (upstreamNode) io.to(upstreamNode).emit('connect_to_downstream', { downstreamId: nodeB });
io.to(nodeB).emit('connect_to_downstream', { downstreamId: nodeA });
if (downstreamNode) {
io.to(nodeA).emit('connect_to_downstream', { downstreamId: downstreamNode });
} else {
io.to(nodeA).emit('disconnect_downstream');
}
// Request Keyframe from Head
setTimeout(() => {
if (chain[0]) io.to(chain[0]).emit('request_keyframe');
}, 1000);
setTimeout(() => {
if (chain[0]) io.to(chain[0]).emit('request_keyframe');
}, 4000);
}
const PORT = process.env.PORT || 3000;
server.listen(PORT, () => console.log(`Server running on http://localhost:${PORT}`));