227 lines
6.6 KiB
JavaScript
227 lines
6.6 KiB
JavaScript
require('dotenv').config();
|
|
const express = require('express');
|
|
const http = require('http');
|
|
const { Server } = require("socket.io");
|
|
const axios = require('axios');
|
|
|
|
const app = express();
|
|
const server = http.createServer(app);
|
|
const io = new Server(server);
|
|
|
|
app.use(express.static('public'));
|
|
|
|
// --- Cloudflare Credentials ---
|
|
app.get('/api/get-turn-credentials', async (req, res) => {
|
|
try {
|
|
const response = await axios.post(
|
|
`https://rtc.live.cloudflare.com/v1/turn/keys/${process.env.CLOUDFLARE_APP_ID}/credentials/generate-ice-servers`,
|
|
{ ttl: 86400 },
|
|
{ headers: { 'Authorization': `Bearer ${process.env.CLOUDFLARE_API_TOKEN}`, 'Content-Type': 'application/json' } }
|
|
);
|
|
res.json(response.data);
|
|
} catch (error) {
|
|
console.error("Cloudflare Error:", error.message);
|
|
res.json({ iceServers: [{ urls: 'stun:stun.l.google.com:19302' }] });
|
|
}
|
|
});
|
|
|
|
// --- STATE MANAGEMENT ---
|
|
// Format: { "StreamName": [socketId1, socketId2, ...] }
|
|
const streams = {};
|
|
const socketStreamMap = {}; // Helper: socketId -> StreamName (for fast lookups on disconnect)
|
|
const scores = {}; // socketId -> Health Score (0-100)
|
|
|
|
io.on('connection', (socket) => {
|
|
console.log(`User connected: ${socket.id}`);
|
|
scores[socket.id] = 80;
|
|
|
|
// 0. Send the list of streams to the new user
|
|
socket.emit('stream_list_update', Object.keys(streams));
|
|
|
|
// A. Start New Stream
|
|
socket.on('start_stream', (streamName) => {
|
|
if (streams[streamName]) {
|
|
socket.emit('error_msg', "Stream name already exists.");
|
|
return;
|
|
}
|
|
if (socketStreamMap[socket.id]) {
|
|
socket.emit('error_msg', "You are already in a stream.");
|
|
return;
|
|
}
|
|
|
|
streams[streamName] = [socket.id];
|
|
socketStreamMap[socket.id] = streamName;
|
|
|
|
console.log(`Stream started: ${streamName} by ${socket.id}`);
|
|
socket.emit('role_assigned', 'head');
|
|
|
|
// Broadcast new list to everyone in the lobby
|
|
io.emit('stream_list_update', Object.keys(streams));
|
|
});
|
|
|
|
// B. Join Existing Stream
|
|
socket.on('join_stream', (streamName) => {
|
|
const chain = streams[streamName];
|
|
if (!chain) {
|
|
socket.emit('error_msg', "Stream does not exist.");
|
|
return;
|
|
}
|
|
|
|
const upstreamPeerId = chain[chain.length - 1];
|
|
chain.push(socket.id);
|
|
socketStreamMap[socket.id] = streamName;
|
|
|
|
console.log(`User ${socket.id} joined stream "${streamName}". Upstream: ${upstreamPeerId}`);
|
|
|
|
// Tell Upstream to connect to ME
|
|
io.to(upstreamPeerId).emit('connect_to_downstream', { downstreamId: socket.id });
|
|
|
|
// Request Keyframe (Targeting the Head of THIS stream)
|
|
setTimeout(() => {
|
|
if (streams[streamName] && streams[streamName][0]) {
|
|
io.to(streams[streamName][0]).emit('request_keyframe');
|
|
}
|
|
}, 2000);
|
|
});
|
|
|
|
// C. Signaling
|
|
socket.on('signal_msg', ({ target, type, sdp, candidate }) => {
|
|
io.to(target).emit('signal_msg', { sender: socket.id, type, sdp, candidate });
|
|
});
|
|
|
|
// D. Score Updates
|
|
socket.on('update_score', (score) => {
|
|
scores[socket.id] = score;
|
|
});
|
|
|
|
// E. Keyframe Relay
|
|
socket.on('relay_keyframe_upstream', () => {
|
|
const streamName = socketStreamMap[socket.id];
|
|
if (streamName && streams[streamName]) {
|
|
io.to(streams[streamName][0]).emit('request_keyframe');
|
|
}
|
|
});
|
|
|
|
// F. Disconnects (Healing)
|
|
socket.on('disconnect', () => {
|
|
delete scores[socket.id];
|
|
const streamName = socketStreamMap[socket.id];
|
|
|
|
if (!streamName) return; // User wasn't in a stream
|
|
|
|
const chain = streams[streamName];
|
|
if (!chain) return;
|
|
|
|
const index = chain.indexOf(socket.id);
|
|
if (index === -1) return;
|
|
|
|
console.log(`User ${socket.id} left stream "${streamName}". Index: ${index}`);
|
|
|
|
// Case 1: Head Left -> Destroy Stream
|
|
if (index === 0) {
|
|
delete streams[streamName];
|
|
// Notify everyone in this stream that it ended
|
|
chain.forEach(peerId => {
|
|
if (peerId !== socket.id) io.to(peerId).emit('stream_ended');
|
|
delete socketStreamMap[peerId];
|
|
});
|
|
io.emit('stream_list_update', Object.keys(streams)); // Update Lobby
|
|
return;
|
|
}
|
|
|
|
// Case 2: Tail Left -> Just pop
|
|
if (index === chain.length - 1) {
|
|
chain.pop();
|
|
const newTail = chain[chain.length - 1];
|
|
if (newTail) io.to(newTail).emit('disconnect_downstream');
|
|
}
|
|
// Case 3: Middle Left -> Stitch
|
|
else {
|
|
const upstreamNode = chain[index - 1];
|
|
const downstreamNode = chain[index + 1];
|
|
chain.splice(index, 1); // Remove node
|
|
|
|
console.log(`Healing "${streamName}": ${upstreamNode} -> ${downstreamNode}`);
|
|
io.to(upstreamNode).emit('connect_to_downstream', { downstreamId: downstreamNode });
|
|
|
|
setTimeout(() => {
|
|
if (streams[streamName] && streams[streamName][0]) {
|
|
io.to(streams[streamName][0]).emit('request_keyframe');
|
|
}
|
|
}, 2000);
|
|
}
|
|
|
|
delete socketStreamMap[socket.id];
|
|
});
|
|
|
|
socket.on('join_monitor', () => {
|
|
socket.join('monitors');
|
|
// Send immediate state
|
|
socket.emit('monitor_update', { streams, scores });
|
|
});
|
|
});
|
|
|
|
// 2. Broadcast State to Monitors (1Hz)
|
|
// We send the full topology and health scores every second.
|
|
setInterval(() => {
|
|
// Only emit if there is someone watching to save bandwidth
|
|
const monitorRoom = io.sockets.adapter.rooms.get('monitors');
|
|
if (monitorRoom && monitorRoom.size > 0) {
|
|
io.to('monitors').emit('monitor_update', { streams, scores });
|
|
}
|
|
}, 1000);
|
|
|
|
// --- THE MULTI-STREAM OPTIMIZER ---
|
|
setInterval(() => {
|
|
// Loop through EVERY active stream
|
|
Object.keys(streams).forEach(streamName => {
|
|
const chain = streams[streamName];
|
|
|
|
if (chain.length < 3) return;
|
|
|
|
// Scan this specific chain
|
|
for (let i = 1; i < chain.length - 1; i++) {
|
|
const current = chain[i];
|
|
const next = chain[i + 1];
|
|
|
|
const scoreCurrent = scores[current] || 0;
|
|
const scoreNext = scores[next] || 0;
|
|
|
|
if (scoreNext > scoreCurrent + 15) {
|
|
console.log(`[${streamName}] Swapping ${current} (${scoreCurrent}) with ${next} (${scoreNext})`);
|
|
performSwap(chain, i, i + 1);
|
|
break; // One swap per stream per cycle
|
|
}
|
|
}
|
|
});
|
|
}, 10000);
|
|
|
|
function performSwap(chain, indexA, indexB) {
|
|
const nodeA = chain[indexA];
|
|
const nodeB = chain[indexB];
|
|
chain[indexA] = nodeB;
|
|
chain[indexB] = nodeA;
|
|
|
|
const upstreamNode = chain[indexA - 1];
|
|
const downstreamNode = chain[indexB + 1];
|
|
|
|
if (upstreamNode) io.to(upstreamNode).emit('connect_to_downstream', { downstreamId: nodeB });
|
|
io.to(nodeB).emit('connect_to_downstream', { downstreamId: nodeA });
|
|
|
|
if (downstreamNode) {
|
|
io.to(nodeA).emit('connect_to_downstream', { downstreamId: downstreamNode });
|
|
} else {
|
|
io.to(nodeA).emit('disconnect_downstream');
|
|
}
|
|
|
|
// Request Keyframe from Head
|
|
setTimeout(() => {
|
|
if (chain[0]) io.to(chain[0]).emit('request_keyframe');
|
|
}, 1000);
|
|
setTimeout(() => {
|
|
if (chain[0]) io.to(chain[0]).emit('request_keyframe');
|
|
}, 4000);
|
|
}
|
|
|
|
const PORT = process.env.PORT || 3000;
|
|
server.listen(PORT, () => console.log(`Server running on http://localhost:${PORT}`)); |