node.js 와 socket.io 그리고 cluster
조회수 2755회
node.js에서 socket.io를 사용해서 메세지 송수신을 할 때, 접속한 소켓들을 변수에 담을 경우 cluster로 생성된 worker 간에 공유가 되지 않으므로 접속한 소켓들을 DB등에 담아야 하겠네요?
정상적인 로직에 cluster 만 추가했는데 소켓 접속을 계속 시도하는 로그가 찍혀서 그런가보다 추측을 하고 있습니다.
/**
* Created by kmk on 2016-05-16.
*/
// mongo db setting
var MongoClient = require('mongodb').MongoClient;
var assert = require('assert');
var mongoUrl = 'mongodb://localhost:27017/push';
// socket.io setting
var app = require('http').createServer(function(_req, _res){
_res.writeHead(404, {"Content-Type": "text/plain"});
_res.write("404 Not Found\n");
_res.end();
});
var io = require('socket.io')(app);
var fs = require('fs');
app.listen(8080);
// 관리자 소켓 아이디 임시 저장 배열
var adminSid = [];
var adminIp = "127.0.0.1";
// 소켓 Client IP
var sip = "";
// 현재 시간 가져오기
var now = function()
{
var moment = require('moment');
return moment().format("YYYY-MM-DD HH:mm:ss");
};
var userFind = function(_db, _data)
{
return _db.collection('users').find(_data);
};
var messageListFind = function(_db, _data, _skip)
{
return _db.collection('messages').find(_data).sort({_id:-1}).skip(_skip).limit(10);
};
/*
* 접속 된 관리자의 소켓 아이디를 임시 저장
* 사용자 정보가 갱신 될 때, 저장 된 관리자 소켓 아이디에 갱신 된 정보 Push 하기 위함
*/
var adminProc = function(_iCSN, _sid, _type)
{
if (isAdmin(_iCSN)===false){
return;
}
if (_type=="join"){
adminSid[_iCSN] = _sid;
} else if (_type=="out"){
delete adminSid[_iCSN];
}
};
/*
아이피하고 iCSN이 숫자가 아닌 경우 관리자로 처리
관리자페이지에서 접속할 경우 클라이언트에서 iCSN에 아이디 값을 넣어 호출
관리자아이디로 게임을 접속할 경우는 일반 회원과 같이 처리
*/
var isAdmin = function(_iCSN){
var re = /^[0-9]+$/i;
if (sip===adminIp && re.test(_iCSN) === false){
return true;
} else {
return false;
}
};
// 사용자 접속 처리 (_joinData : iCSN, gamecode, server)
var joinUser = function(_joinData, _socket, _callback)
{
// 사용자 접속 로그
console.log("join : ["+_joinData.iCSN+" / "+_joinData.gamecode+" / "+_joinData.server+"]");
// 관리자인지 체크해서 관리자 소켓 임시 저장
adminProc(_joinData.iCSN, _socket.id, "join");
// DB 저장
MongoClient.connect(mongoUrl, function(err, db)
{
assert.equal(null, err);
_joinData.sid = _socket.id;
_joinData.connectTime = now();
db.collection('users').update(
{
"iCSN" : _joinData.iCSN,
"gamecode": _joinData.gamecode,
"server" : _joinData.server
},
{$set: _joinData},
{upsert:true}
);
// 접속자 정보를 관리자에 전송 (접속자는 소켓정보가 있고, iCSN이 숫자인 경우만 접속자로 처리)
userFind(db, {"sid":/^\/#/i, "iCSN":{$regex:/^[0-9]+$/i}}).toArray(function(err, result){
_callback(result);
//console.log(result);
});
});
};
// 사용자 종료 처리
var outUser = function(_socket, _callback)
{
MongoClient.connect(mongoUrl, function(err, db)
{
assert.equal(null, err);
var userFindData = {
sid: _socket.id
};
userFind(db, userFindData).toArray(function(err, result)
{
var _outUserData = {
"iCSN" : result[0].iCSN,
"gamecode" : result[0].gamecode,
"server" : result[0].server,
"sid" : "",
"disconnectTime": now()
};
db.collection('users').update(
{
"iCSN" : _outUserData.iCSN,
"gamecode": _outUserData.gamecode,
"server" : _outUserData.server
},
{$set: _outUserData},
{upsert:true}
);
// 사용자 종료 로그
console.log("out : ["+_outUserData.iCSN+" / "+_outUserData.gamecode+" / "+_outUserData.server+"]");
// 관리자인지 체크해서 해당 관리자 소켓 삭제
adminProc(_outUserData.iCSN, _socket.id, "out");
_callback(db);
});
});
};
// 사용자 접속상태 체크
var sendMsg = function(_data, _isBroad){
if (sip!==adminIp){
return;
}
MongoClient.connect(mongoUrl, function(err, db)
{
assert.equal(null, err);
if (_isBroad){
insertMessage(_data);
io.sockets.emit("userGetMsg", _data.msg);
return;
} else {
var checkOnlineData = {
iCSN : _data.iCSN,
gamecode: _data.gamecode,
server : _data.server
};
}
userFind(db, checkOnlineData).toArray(function(err, result)
{
try {
if (result[0].sid != ""){
console.log("online : "+_data.iCSN+" / "+_data.gamecode+" / "+_data.server);
io.sockets.connected[result[0].sid].emit("userGetMsg", _data.msg);
} else {
console.log("offline : "+_data.iCSN+" / "+_data.gamecode+" / "+_data.server);
}
insertMessage(_data);
} catch (e){
console.log("offline : "+_data.iCSN+" / "+_data.gamecode+" / "+_data.server);
insertMessage(_data);
}
});
});
};
// 발송 메세지 DB 저장
var insertMessage = function(_data)
{
MongoClient.connect(mongoUrl, function(err, db)
{
assert.equal(null, err);
_data.regDate = now();
db.collection('messages').insert(_data);
});
};
// 사용자 메세지 리스트 가져오기
function getMessageLists(_data, _skip, _callback)
{
MongoClient.connect(mongoUrl, function(err, db)
{
assert.equal(null, err);
var data1 = {
iCSN :_data.iCSN,
gamecode:_data.gamecode,
server :_data.server
};
var data2 = {
iCSN:{$exists:false}
};
messageListFind(db, {$or : [data1,data2]}, _skip).toArray(function(err, result)
{
//console.log(result);
_callback(result);
});
});
}
io.on('connection', function(socket)
{
// 접속자 IP
sip = socket.request.socket.remoteAddress.split(":")[socket.request.socket.remoteAddress.split(":").length-1];
// 클라이언트 접속시 최초 호출
socket.emit('connUser');
// 접속 호출을 받은 클라언트에서 접속 처리 요청
socket.on('joinUser', function(data)
{
// users DB에 저장
joinUser(data, socket, function(_result){
Object.keys(adminSid).forEach(function (key) {
//console.log(adminSid[key]);
io.sockets.connected[adminSid[key]].emit("userList", _result);
});
});
// 최초 접속 시, 기존 메세지 전달
getMessageLists(data, 0, function(_result){
//console.log(Object.keys(_result).length);
//console.log(_result);
//_result = _result.reverse();
socket.emit('storedMsg', _result);
});
});
// 메세지 더보기 요청
socket.on('getOldMsg', function(data){
console.log("getOldMsg Skip : "+data.skip);
getMessageLists(data.param, data.skip, function(_result){
//console.log(Object.keys(_result).length);
//console.log(_result);
//_result = _result.reverse();
socket.emit('storedMsg', _result);
});
});
// 관리자에서 특정 클라이언트에게 메세지 전송
socket.on('userMsg', function(data){
sendMsg(data, false);
});
// 관리자에서 전체 클라이언트에게 메세지 전송
socket.on('broadMsg', function(data){
sendMsg(data, true);
});
// 종료 콜백
socket.on('disconnect', function()
{
// 종료 회원 DB 처리
outUser(socket, function(_db){
// 접속자 정보를 관리자에 전송 (접속자는 소켓정보가 있고, iCSN이 숫자인 경우만 접속자로 처리)
userFind(_db, {"sid":/^\/#/i, "iCSN":{$regex:/^[0-9]+$/i}}).toArray(function(err, result){
Object.keys(adminSid).forEach(function (key) {
//console.log(adminSid[key]);
io.sockets.connected[adminSid[key]].emit("userList", result);
});
//console.log(result);
});
});
});
});
위 로직에
var cluster = require('cluster');
var numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
for (var i=0; i<numCPUs; i++) {
cluster.fork();
}
} else {
이 부분만 상단에 추가되었는데, 소켓접속을 계속 시도하는 로그가 찍히네요.
댓글 입력