From 45d6eb5e8a069019e6087f2d36027295a73ee586 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=8B=AC=EC=9E=AC=EC=97=BD?= <jysim0326@ajou.ac.kr> Date: Sun, 8 Dec 2024 15:56:04 +0900 Subject: [PATCH] =?UTF-8?q?refactor:=20=EC=A4=91=EB=B3=B5=20=EC=BD=94?= =?UTF-8?q?=EB=93=9C=20=ED=86=B5=ED=95=A9,=20rabbitmq=20=EC=97=B0=EA=B2=B0?= =?UTF-8?q?=20=EC=9C=A0=EC=A7=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- wsServer.js | 651 ++++++++++++++++++++++++++++++++-------------------- 1 file changed, 401 insertions(+), 250 deletions(-) diff --git a/wsServer.js b/wsServer.js index e938ef8..fa69548 100644 --- a/wsServer.js +++ b/wsServer.js @@ -5,22 +5,21 @@ const mongoose = require('mongoose'); const admin = require('firebase-admin'); const dotenv = require('dotenv'); const amqp = require('amqplib'); // RabbitMQ 연결 -const ChatRoom = require('./schemas/chatRooms'); +const ChatRoom = require('./schemas/ChatRooms'); // .env 파일 로드 dotenv.config(); -// 서비스 계정 키 파일 경로를 환경 변수에서 가져오기 -const serviceAccountPath = process.env.FIREBASE_CREDENTIAL_PATH; +const HEARTBEAT_TIMEOUT = 10000; // 10초 후 타임아웃 -// Firebase Admin SDK 초기화 -admin.initializeApp({ - credential: admin.credential.cert(require(serviceAccountPath)), -}); +// RabbitMQ 연결 풀 생성 +let amqpConnection, amqpChannel; // WebSocket 관련 데이터 let clients = []; -let chatRooms = {}; + +// 클라이언트 상태를 저장하는 Map +const clientHeartbeats = new Map(); // MongoDB 연결 설정 async function connectMongoDB() { @@ -39,14 +38,35 @@ async function connectMongoDB() { } } -// RabbitMQ 메시지 발행 함수 +// // RabbitMQ 메시지 발행 함수 +// async function publishToQueue(queue, message) { +// const connection = await amqp.connect(process.env.RABBITMQ_URL || 'amqp://localhost'); +// const channel = await connection.createChannel(); +// await channel.assertQueue(queue, { durable: true }); +// channel.sendToQueue(queue, Buffer.from(JSON.stringify(message))); +// console.log(`Message sent to queue ${queue}:`, message); +// setTimeout(() => connection.close(), 500); // 연결 닫기 +// } + +async function setupRabbitMQ() { + try { + amqpConnection = await amqp.connect(process.env.RABBITMQ_URL || 'amqp://localhost'); + amqpChannel = await amqpConnection.createChannel(); + console.log('RabbitMQ connection established'); + } catch (err) { + logError('RabbitMQ Setup', err); + process.exit(1); + } +} + async function publishToQueue(queue, message) { - const connection = await amqp.connect(process.env.RABBITMQ_URL || 'amqp://localhost'); - const channel = await connection.createChannel(); - await channel.assertQueue(queue, { durable: true }); - channel.sendToQueue(queue, Buffer.from(JSON.stringify(message))); - console.log(`Message sent to queue ${queue}:`, message); - setTimeout(() => connection.close(), 500); // 연결 닫기 + try { + await amqpChannel.assertQueue(queue, { durable: true }); + amqpChannel.sendToQueue(queue, Buffer.from(JSON.stringify(message))); + console.log(`Message sent to queue ${queue}:`, message); + } catch (err) { + logError('RabbitMQ Publish', err); + } } // RabbitMQ를 통해 푸시 알림 요청을 전송하는 함수 @@ -67,237 +87,355 @@ async function getChatHistory(chatRoomId) { return chatRoom ? chatRoom.messages : []; } -// WebSocket 서버 생성 및 핸드셰이크 처리 function startWebSocketServer() { - const wsServer = http.createServer((req, res) => { + const server = http.createServer((req, res) => { res.writeHead(200, { 'Content-Type': 'text/plain' }); res.end('WebSocket server is running'); }); - wsServer.on('upgrade', (req, socket, head) => { - const key = req.headers['sec-websocket-key']; - const acceptKey = generateAcceptValue(key); - const responseHeaders = [ - 'HTTP/1.1 101 Switching Protocols', - 'Upgrade: websocket', - 'Connection: Upgrade', - `Sec-WebSocket-Accept: ${acceptKey}` - ]; - socket.write(responseHeaders.join('\r\n') + '\r\n\r\n'); - - // 클라이언트를 clients 배열에 추가 - clients.push(socket); - - let chatRoomId = null; - let nickname = null; - - socket.on('data', async buffer => { - let message; - try { - message = parseMessage(buffer); - const parsedData = JSON.parse(message); - const { type, chatRoomId: clientChatRoomId, nickname: clientNickname, text } = parsedData; - - console.log('서버에서 수신한 메시지:', { type, clientChatRoomId, clientNickname, text }); - - if (type === 'join' || type === 'leave') { - await ChatRoom.updateOne( - { chatRoomId: clientChatRoomId }, - { $set: { [`isOnline.${clientNickname}`]: type === 'join' } } - ); - - const statusMessage = { - type: 'status', - chatRoomId: clientChatRoomId, - nickname: clientNickname, - isOnline: type === 'join', - }; - - clients.forEach(client => { - client.write(constructReply(JSON.stringify(statusMessage))); - }); - } - - if (type === 'join') { - chatRoomId = clientChatRoomId; - nickname = clientNickname; - - await ChatRoom.updateOne( - { chatRoomId }, - { - $set: { - [`isOnline.${nickname}`]: true, - [`lastReadLogId.${nickname}`]: null, - }, - } - ); - - if (!chatRooms[chatRoomId]) { - chatRooms[chatRoomId] = []; - } - - const chatRoom = await ChatRoom.findOne({ chatRoomId }); - - // 참가자 확인 - const participantIndex = chatRoom.participants.findIndex(participant => participant.name === nickname); - if (participantIndex !== -1) { - const existingParticipant = chatRoom.participants[participantIndex]; - - // 참가자 상태 업데이트 - existingParticipant.isOnline = true; - existingParticipant.lastReadAt = new Date(); - - await chatRoom.save(); - } else { - // 새 참가자 추가 - const joinMessage = { - message: `${nickname}님이 참가했습니다.`, - timestamp: new Date(), - type: 'join' - }; - - chatRoom.participants.push({ - name: nickname, - fcmTokens: parsedData.fcmToken ? [parsedData.fcmToken] : [], - lastReadAt: new Date(), - lastReadLogId: null, - isOnline: true, - }); - - chatRoom.messages.push(joinMessage); - - await chatRoom.save(); - - clients.forEach(client => { - client.write(constructReply(JSON.stringify(joinMessage))); - }); - - console.log(`${nickname} 새 참가자로 추가`); - } - - try { - const previousMessages = await getChatHistory(chatRoomId); - if (previousMessages.length > 0) { - socket.write(constructReply(JSON.stringify({ type: 'previousMessages', messages: previousMessages }))); - console.log(`이전 메시지 전송: ${previousMessages.length}개`); - } - } catch (err) { - console.error('이전 채팅 기록 불러오기 중 오류 발생:', err); - } - - } else if (type === 'message') { - const chatMessage = { - message: text, - timestamp: new Date(), - type: 'message', - sender: nickname - }; - - - chatRooms[chatRoomId].push(chatMessage); - - try { - // 새로운 메시지를 messages 배열에 추가 - const updatedChatRoom = await ChatRoom.findOneAndUpdate( - { chatRoomId }, - { $push: { messages: chatMessage } }, - { new: true, fields: { "messages": { $slice: -1 } } } // 마지막 추가된 메시지만 가져옴 - ); - - // 마지막에 추가된 메시지의 _id를 가져오기 - const savedMessage = updatedChatRoom.messages[updatedChatRoom.messages.length - 1]; - - // 새로운 메시지 전송: 클라이언트로 메시지 브로드캐스트 - const messageData = { - type: 'message', - chatRoomId, - sender: nickname, - message: text, - timestamp: chatMessage.timestamp, - _id: savedMessage._id // 저장된 메시지의 _id 사용 - }; - - clients.forEach(client => { - client.write(constructReply(JSON.stringify(messageData))); - console.log('채팅 메시지 전송:', messageData); - }); - - // 오프라인 사용자에게 FCM 푸시 알림 전송 - const chatRoom = await ChatRoom.findOne({ chatRoomId }); - const offlineParticipants = chatRoom.participants.filter(participant => { - // isOnline 상태를 Map에서 가져오기 - const isOnline = chatRoom.isOnline.get(participant.name); - return isOnline === false; // 정확히 false인 사용자만 필터링 - }); - - console.log("offlineParticipants", offlineParticipants); - - // RabbitMQ에 푸시 알림 요청 발행 - await sendPushNotificationRequest(chatRoom.chatRoomName, clientNickname, text, offlineParticipants, chatRoomId); - } catch (err) { - console.error('MongoDB 채팅 메시지 저장 오류:', err); - } - } else if (type === 'leave') { - const leaveMessage = { - message: `${nickname}님이 퇴장했습니다.`, - timestamp: new Date(), - type: 'leave' - }; - - chatRooms[chatRoomId].push(leaveMessage); - - await ChatRoom.updateOne( - { chatRoomId }, - { $set: { [`isOnline.${nickname}`]: false } } - ); - - await ChatRoom.updateOne({ chatRoomId }, { - $push: { messages: leaveMessage }, - $pull: { participants: nickname } - }); - - clients.forEach(client => { - client.write(constructReply(JSON.stringify(leaveMessage))); - }); - - clients = clients.filter(client => client !== socket); - } - } catch (err) { - console.error('메시지 처리 중 오류 발생:', err); - } - }); + server.on('upgrade', (req, socket, head) => { + handleWebSocketUpgrade(req, socket); + }); - socket.on('close', async () => { - if (nickname && chatRoomId) { - await ChatRoom.updateOne( - { chatRoomId }, - { $set: { [`isOnline.${nickname}`]: false } } - ); - - const statusMessage = { - type: 'status', - chatRoomId, - nickname, - isOnline: false, - }; - - clients.forEach(client => { - client.write(constructReply(JSON.stringify(statusMessage))); - }); - } + server.listen(8081, () => { + console.log('WebSocket 채팅 서버가 8081 포트에서 실행 중입니다.'); + }); +} + +function handleWebSocketUpgrade(req, socket) { + const key = req.headers['sec-websocket-key']; + const acceptKey = generateAcceptValue(key); + const responseHeaders = [ + 'HTTP/1.1 101 Switching Protocols', + 'Upgrade: websocket', + 'Connection: Upgrade', + `Sec-WebSocket-Accept: ${acceptKey}` + ]; + + socket.write(responseHeaders.join('\r\n') + '\r\n\r\n'); + + // 클라이언트를 clients 배열에 추가 + clients.push(socket); + + socket.on('data', async buffer => { + try { + message = parseMessage(buffer); + if (!message) return; // 메시지가 비어 있는 경우 무시 + + const parsedData = JSON.parse(message); + const { type, chatRoomId: clientChatRoomId, nickname: clientNickname, text } = parsedData; + await handleClientMessage(socket, parsedData); + } catch (err) { + console.error('Error processing message:', err); + } + }); + + socket.on('close', async () => { + + console.log(`WebSocket 연결이 종료되었습니다: ${socket.nickname}, ${socket.chatRoomId}`); + + // 클라이언트 Heartbeat 맵에서 제거 + clientHeartbeats.delete(socket); + + // 클라이언트 목록에서 제거 + clients = clients.filter((client) => client !== socket); + + + // 소켓 종료 전, 창 닫기 or hidden 때문에 이미 온라인 상태 false로 됨 (중복 로직 주석 처리) + // if (socket.nickname && socket.chatRoomId) { + // await ChatRoom.updateOne( + // { chatRoomId: socket.chatRoomId }, + // { $set: { [`isOnline.${socket.nickname}`]: false } } + // ); + // } + }); + + socket.on('error', (err) => { + console.error(`WebSocket error: ${err}`); + clients = clients.filter((client) => client !== socket); + }); + +} + +// 메시지 타입 처리 +async function handleClientMessage(socket, data) { + const { type, chatRoomId, nickname, text, fcmToken } = data; + + // 타임아웃된 소켓 차단 + if (socket.isTimedOut) { + console.log(`타임아웃된 클라이언트의 재연결을 차단: ${nickname}`); + return; + } + + switch (type) { + case 'heartbeat': + // console.log(`Heartbeat received from ${nickname} in room ${chatRoomId}`); + clientHeartbeats.set(socket, Date.now()); + break; + case 'join': + // WebSocket에 사용자 정보 저장 + // socket.nickname = nickname; + // socket.chatRoomId = chatRoomId; + await handleJoin(socket, chatRoomId, nickname, fcmToken); + break; + case 'message': + await handleMessage(chatRoomId, nickname, text); + break; + case 'leave': + await handleLeave(chatRoomId, nickname); + break; + case 'notice': + await handleSetNotice(chatRoomId, nickname, text); + break; + default: + console.log(`Unknown message type: ${type}`); + } +} + +// join - 참가 메시지 +async function handleJoin(socket, chatRoomId, nickname) { + if (socket.isTimedOut) { + console.log(`타임아웃된 클라이언트의 재참여를 차단: ${nickname}`); + return; + } + + // Set client properties + socket.chatRoomId = chatRoomId; + socket.nickname = nickname; + + console.log(`Client joined room: ${chatRoomId}, nickname: ${nickname}`); + + await ChatRoom.updateOne( + { chatRoomId: chatRoomId }, + { $set: { [`isOnline.${nickname}`]:true } } + ); + + const statusMessage = { + type: 'status', + chatRoomId: chatRoomId, + nickname: nickname, + isOnline: true, + }; + + broadcastMessage(chatRoomId, statusMessage); + + await ChatRoom.updateOne( + { chatRoomId }, + { $set: { + [`isOnline.${nickname}`]: true, + [`lastReadLogId.${nickname}`]: null, + }, + } + ); + + const chatRoom = await ChatRoom.findOne({ chatRoomId }); + + // 참가자 확인 + const participantIndex = chatRoom.participants.findIndex(participant => participant.name === nickname); + + if (participantIndex !== -1) { + const existingParticipant = chatRoom.participants[participantIndex]; + + // 참가자 상태 업데이트 + existingParticipant.isOnline = true; + existingParticipant.lastReadAt = new Date(); + + await chatRoom.save(); + } else { + // 새 참가자 추가 + const joinMessage = { + message: `${nickname}님이 참가했습니다.`, + timestamp: new Date(), + type: 'join' + }; + + chatRoom.participants.push({ + name: nickname, + fcmTokens: parsedData.fcmToken ? [parsedData.fcmToken] : [], + lastReadAt: new Date(), + lastReadLogId: null, + isOnline: true, + }); + + chatRoom.messages.push(joinMessage); + + await chatRoom.save(); + + broadcastMessage(chatRoomId, joinMessage); + + console.log(`${nickname} 새 참가자로 추가`); + } + + const previousMessages = await getChatHistory(chatRoomId); + if (previousMessages.length > 0) { + socket.write(constructReply(JSON.stringify({ type: 'previousMessages', messages: previousMessages }))); + } +} + +// meessage - 일반 메시지 +async function handleMessage(chatRoomId, nickname, text) { + const chatMessage = { message: text, timestamp: new Date(), type: 'message', sender: nickname }; + + try { + const updatedChatRoom = await ChatRoom.findOneAndUpdate( + { chatRoomId }, + { $push: { messages: chatMessage } }, + { new: true, fields: { messages: { $slice: -1 } } } + ); + + // 마지막에 추가된 메시지의 _id를 가져오기 + const savedMessage = updatedChatRoom.messages[updatedChatRoom.messages.length - 1]; + + // 새로운 메시지 전송: 클라이언트로 메시지 브로드캐스트 + const messageData = { + type: 'message', + chatRoomId, + sender: nickname, + message: text, + timestamp: chatMessage.timestamp, + _id: savedMessage._id // 저장된 메시지의 _id 사용 + }; + + console.log('채팅에서 Current clients:', clients.map(client => client.chatRoomId)); + + // broadcastMessage(chatRoomId, messageData); + + clients.forEach(client => { + client.write(constructReply(JSON.stringify(messageData))); + console.log('채팅 메시지 전송:', messageData); }); - socket.on('error', (err) => { - console.error(`WebSocket error: ${err}`); - clients = clients.filter(client => client !== socket); + // 오프라인 사용자에게 FCM 푸시 알림 전송 + const chatRoom = await ChatRoom.findOne({ chatRoomId }); + const offlineParticipants = chatRoom.participants.filter(participant => { + // isOnline 상태를 Map에서 가져오기 + const isOnline = chatRoom.isOnline.get(participant.name); + return isOnline === false; // 정확히 false인 사용자만 필터링 }); + + console.log("offlineParticipants", offlineParticipants); + + // RabbitMQ에 푸시 알림 요청 발행 + await sendPushNotificationRequest(chatRoom.chatRoomName, nickname, text, offlineParticipants, chatRoomId); + + } catch (err) { + console.error('Error saving message to MongoDB:', err); + } +} + +// leave - 퇴장 메시지 +async function handleLeave(chatRoomId, nickname) { + await ChatRoom.updateOne( + { chatRoomId: clientChatRoomId }, + { $set: { [`isOnline.${clientNickname}`]: type === 'leave' } } + ); + + const statusMessage = { + type: 'status', + chatRoomId: clientChatRoomId, + nickname: clientNickname, + isOnline: type === 'leave', + }; + + clients.forEach(client => { + client.write(constructReply(JSON.stringify(statusMessage))); }); - wsServer.listen(8081, () => { - console.log('WebSocket 채팅 서버가 8081 포트에서 실행 중입니다.'); + const leaveMessage = { message: `${nickname} 님이 퇴장했습니다.`, timestamp: new Date(), type: 'leave' }; + await ChatRoom.updateOne({ chatRoomId }, { $push: { messages: leaveMessage } }); + broadcastMessage(chatRoomId, leaveMessage); +} + +async function handleSetNotice(chatRoomId, sender, message) { + const notice = { + sender, + message, + timestamp: new Date(), + }; + + try { + // MongoDB에 최신 공지 저장 + await ChatRoom.updateOne( + { chatRoomId }, + { $push: { notices: notice } } + ); + + // 모든 클라이언트에게 공지사항 업데이트 메시지 전송 + const noticeMessage = { + type: 'notice', + chatRoomId, + sender, + message, + }; + + clients.forEach(client => { + client.write(constructReply(JSON.stringify(noticeMessage))); + }); + + // broadcastMessage(chatRoomId, noticeMessage); + + console.log('공지사항 업데이트:', noticeMessage); + } catch (error) { + console.error('공지사항 업데이트 실패:', error); + } +} + +// Broadcast message to clients in the same chat room +function broadcastMessage(chatRoomId, message) { + clients.forEach((client) => { + if (client.chatRoomId === chatRoomId) { + client.write(constructReply(JSON.stringify(message))); + } }); } +// 주기적으로 Heartbeat 상태 확인 +setInterval(async () => { + const now = Date.now(); + for (const [socket, lastHeartbeat] of clientHeartbeats.entries()) { + if (now - lastHeartbeat > HEARTBEAT_TIMEOUT) { + console.log('타임아웃 대상 클라이언트:', { + nickname: socket.nickname, + chatRoomId: socket.chatRoomId, + lastHeartbeat: new Date(lastHeartbeat).toISOString(), + }); + + // Heartbeat 맵에서 제거 + clientHeartbeats.delete(socket); + + // 상태 플래그 설정 + socket.isTimedOut = true; + + // 소켓 연결 종료 + socket.end(); + + // 클라이언트 목록에서 제거 + clients = clients.filter((client) => client !== socket); + + // 클라이언트를 오프라인으로 설정 + console.log("Client timed out 후 오프라인 설정"); + await ChatRoom.updateOne( + { [`isOnline.${socket.nickname}`]: false }, + { [`lastReadAt.${socket.nickname}`]: new Date() } + ); + + // 클라이언트에게 연결 종료 메시지 전송 + const timeoutMessage = JSON.stringify({ + type: 'status', + nickname: socket.nickname, + chatRoomId: socket.chatRoomId, + isOnline: false, + }); + + clients.forEach(client => { + client.write(constructReply(timeoutMessage)); + }); + + + } + } +}, 5000); // 5초마다 상태 확인 + // Sec-WebSocket-Accept 헤더 값 생성 -> env처리 function generateAcceptValue(key) { return crypto.createHash('sha1').update(key + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11', 'binary').digest('base64'); @@ -305,27 +443,37 @@ function generateAcceptValue(key) { // WebSocket 메시지 파싱 함수 function parseMessage(buffer) { - const byteArray = [...buffer]; - const secondByte = byteArray[1]; - let length = secondByte & 127; - let maskStart = 2; - - if (length === 126) { - length = (byteArray[2] << 8) + byteArray[3]; - maskStart = 4; - } else if (length === 127) { - length = 0; - for (let i = 0; i < 8; i++) { - length = (length << 8) + byteArray[2 + i]; + try { + const byteArray = [...buffer]; + const secondByte = byteArray[1]; + let length = secondByte & 127; + let maskStart = 2; + + if (length === 126) { + length = (byteArray[2] << 8) + byteArray[3]; + maskStart = 4; + } else if (length === 127) { + length = 0; + for (let i = 0; i < 8; i++) { + length = (length << 8) + byteArray[2 + i]; + } + maskStart = 10; } - maskStart = 10; - } - const dataStart = maskStart + 4; - const mask = byteArray.slice(maskStart, dataStart); - const data = byteArray.slice(dataStart, dataStart + length).map((byte, i) => byte ^ mask[i % 4]); + const dataStart = maskStart + 4; + const mask = byteArray.slice(maskStart, dataStart); + const data = byteArray.slice(dataStart, dataStart + length).map((byte, i) => byte ^ mask[i % 4]); + + const decodedMessage = new TextDecoder('utf-8').decode(Uint8Array.from(data)); + + // JSON 유효성 검사 + JSON.parse(decodedMessage); - return new TextDecoder('utf-8').decode(Uint8Array.from(data)); + return decodedMessage; + } catch (err) { + console.error('Error parsing WebSocket message:', err.message); + return null; // 유효하지 않은 메시지는 무시 + } } // 클라이언트 메시지 응답 생성 함수 @@ -353,5 +501,8 @@ function constructReply(message) { return Buffer.concat([Buffer.from(reply), messageBuffer]); } +// 서버 시작 시 RabbitMQ 설정 +setupRabbitMQ(); + // MongoDB 연결 후 WebSocket 서버 시작 connectMongoDB(); \ No newline at end of file -- GitLab