187 lines
5.6 KiB
JavaScript

require('dotenv').config();
const express = require('express');
const http = require('http');
const { Server } = require("socket.io");
const axios = require('axios');
const app = express();
const server = http.createServer(app);
const io = new Server(server);
app.use(express.static('public'));
// --- Cloudflare Credentials ---
app.get('/api/get-turn-credentials', async (req, res) => {
try {
const response = await axios.post(
`https://rtc.live.cloudflare.com/v1/turn/keys/${process.env.CLOUDFLARE_APP_ID}/credentials/generate-ice-servers`,
{ ttl: 86400 },
{ headers: { 'Authorization': `Bearer ${process.env.CLOUDFLARE_API_TOKEN}`, 'Content-Type': 'application/json' } }
);
res.json(response.data);
} catch (error) {
console.error("Cloudflare Error:", error.message);
res.status(500).json({ error: "Failed to fetch credentials" });
}
});
// --- Daisy Chain Logic ---
let strand = []; // Ordered list of socket IDs
let scores = {}; // Map: socket.id -> score (0-100)
io.on('connection', (socket) => {
console.log(`User connected: ${socket.id}`);
// Default score for new users (optimistic, so they get a chance to prove themselves)
scores[socket.id] = 80;
socket.on('update_score', (score) => {
scores[socket.id] = score;
});
// A. Start Stream (Head)
socket.on('start_stream', () => {
if (strand.length > 0) {
socket.emit('error_msg', "Stream already in progress.");
return;
}
strand.push(socket.id);
console.log("Stream started. Head:", socket.id);
socket.emit('role_assigned', 'head');
});
// B. Join Stream (Tail)
socket.on('join_stream', () => {
if (strand.length === 0) {
socket.emit('error_msg', "No active stream to join.");
return;
}
const upstreamPeerId = strand[strand.length - 1];
strand.push(socket.id);
console.log(`User ${socket.id} joined. Upstream: ${upstreamPeerId}`);
// 1. Tell Upstream to connect to ME
io.to(upstreamPeerId).emit('connect_to_downstream', { downstreamId: socket.id });
// 2. Request Keyframe from Head (Delayed to allow connection setup)
setTimeout(() => {
if (strand[0]) io.to(strand[0]).emit('request_keyframe');
}, 2000);
});
// C. Signaling
socket.on('signal_msg', ({ target, type, sdp, candidate }) => {
io.to(target).emit('signal_msg', { sender: socket.id, type, sdp, candidate });
});
// D. Keyframe Relay
socket.on('relay_keyframe_upstream', () => {
// In a real robust app, you'd bubble this up the chain.
// Here we just blast the Source directly if we know who they are.
if (strand[0]) io.to(strand[0]).emit('request_keyframe');
});
// E. Disconnects (Healing)
socket.on('disconnect', () => {
const index = strand.indexOf(socket.id);
if (index === -1) return;
console.log(`User ${socket.id} disconnected. Index: ${index}`);
// Head left
if (index === 0) {
strand = [];
io.emit('stream_ended');
return;
}
// Tail left
if (index === strand.length - 1) {
strand.pop();
return;
}
// Middle left (Stitch)
const upstreamNode = strand[index - 1];
const downstreamNode = strand[index + 1];
strand.splice(index, 1);
console.log(`Healing: ${upstreamNode} -> ${downstreamNode}`);
io.to(upstreamNode).emit('connect_to_downstream', { downstreamId: downstreamNode });
setTimeout(() => {
if (strand[0]) io.to(strand[0]).emit('request_keyframe');
}, 2000);
});
});
// --- THE OPTIMIZER LOOP ---
setInterval(() => {
if (strand.length < 3) return; // Need at least Head + 2 Nodes to swap anything
// We iterate from the 2nd node (Index 1) to the end.
// Index 0 is HEAD (Source), we never move the source.
let swapped = false;
// We look for ONE swap per cycle to minimize chaos.
for (let i = 1; i < strand.length - 1; i++) {
const current = strand[i];
const next = strand[i + 1];
const scoreCurrent = scores[current] || 0;
const scoreNext = scores[next] || 0;
// THRESHOLD: Only swap if the next guy is significantly stronger (+15 points)
// This prevents users flip-flopping constantly due to minor fluctuations.
if (scoreNext > scoreCurrent + 15) {
console.log(`Swapping Weak ${current} (${scoreCurrent}) with Strong ${next} (${scoreNext})`);
performSwap(i, i + 1);
swapped = true;
break; // Stop after one swap to let network settle
}
}
}, 10000); // Run every 10 seconds
function performSwap(indexA, indexB) {
// 1. Update the Array (Logical Swap)
const nodeA = strand[indexA];
const nodeB = strand[indexB];
strand[indexA] = nodeB;
strand[indexB] = nodeA;
// State after swap:
// [ ... -> Upstream -> NodeB -> NodeA -> Downstream -> ... ]
// Identify the relevant neighbors
const upstreamNode = strand[indexA - 1]; // The node before the pair
const downstreamNode = strand[indexB + 1]; // The node after the pair
// 2. Instruct the Upstream to connect to the NEW first node (NodeB)
if (upstreamNode) {
io.to(upstreamNode).emit('connect_to_downstream', { downstreamId: nodeB });
}
// 3. Instruct NodeB to connect to NodeA (The internal link)
io.to(nodeB).emit('connect_to_downstream', { downstreamId: nodeA });
// 4. Instruct NodeA to connect to the Downstream (or null if end)
if (downstreamNode) {
io.to(nodeA).emit('connect_to_downstream', { downstreamId: downstreamNode });
} else {
// If NodeA is now the tail, it disconnects its downstream
io.to(nodeA).emit('disconnect_downstream'); // You might need to add this handler to client
}
// 5. Trigger Keyframes to heal the image
setTimeout(() => {
if (strand[0]) io.to(strand[0]).emit('request_keyframe');
}, 1000);
}
const PORT = process.env.PORT || 3000;
server.listen(PORT, () => console.log(`Server running on http://localhost:${PORT}`));