node.js 와 socket.io 그리고 cluster

조회수 2754회

node.js에서 socket.io를 사용해서 메세지 송수신을 할 때, 접속한 소켓들을 변수에 담을 경우 cluster로 생성된 worker 간에 공유가 되지 않으므로 접속한 소켓들을 DB등에 담아야 하겠네요?

정상적인 로직에 cluster 만 추가했는데 소켓 접속을 계속 시도하는 로그가 찍혀서 그런가보다 추측을 하고 있습니다.

/**
 * Created by kmk on 2016-05-16.
 */

// mongo db setting
var MongoClient = require('mongodb').MongoClient;
var assert = require('assert');
var mongoUrl = 'mongodb://localhost:27017/push';

// socket.io setting
var app = require('http').createServer(function(_req, _res){
    _res.writeHead(404, {"Content-Type": "text/plain"});
    _res.write("404 Not Found\n");
    _res.end();
});
var io = require('socket.io')(app);
var fs = require('fs');

app.listen(8080);

// 관리자 소켓 아이디 임시 저장 배열
var adminSid = [];
var adminIp = "127.0.0.1";

// 소켓 Client IP
var sip = "";

// 현재 시간 가져오기
var now = function()
{
    var moment = require('moment');
    return moment().format("YYYY-MM-DD HH:mm:ss");
};

var userFind = function(_db, _data)
{
    return _db.collection('users').find(_data);
};
var messageListFind = function(_db, _data, _skip)
{
    return _db.collection('messages').find(_data).sort({_id:-1}).skip(_skip).limit(10);
};
/*
 * 접속 된 관리자의 소켓 아이디를 임시 저장
 * 사용자 정보가 갱신 될 때, 저장 된 관리자 소켓 아이디에 갱신 된 정보 Push 하기 위함
*/
var adminProc = function(_iCSN, _sid, _type)
{
    if (isAdmin(_iCSN)===false){
        return;
    }

    if (_type=="join"){
        adminSid[_iCSN] = _sid;
    } else if (_type=="out"){
        delete adminSid[_iCSN];
    }
};
/*
아이피하고 iCSN이 숫자가 아닌 경우 관리자로 처리

관리자페이지에서 접속할 경우 클라이언트에서 iCSN에 아이디 값을 넣어 호출
관리자아이디로 게임을 접속할 경우는 일반 회원과 같이 처리
*/
var isAdmin = function(_iCSN){
    var re = /^[0-9]+$/i;
    if (sip===adminIp && re.test(_iCSN) === false){
        return true;
    } else {
        return false;
    }
};

// 사용자 접속 처리 (_joinData : iCSN, gamecode, server)
var joinUser = function(_joinData, _socket, _callback)
{
    // 사용자 접속 로그
    console.log("join : ["+_joinData.iCSN+" / "+_joinData.gamecode+" / "+_joinData.server+"]");
    // 관리자인지 체크해서 관리자 소켓 임시 저장
    adminProc(_joinData.iCSN, _socket.id, "join");

    // DB 저장
    MongoClient.connect(mongoUrl, function(err, db)
    {
        assert.equal(null, err);
        _joinData.sid = _socket.id;
        _joinData.connectTime = now();

        db.collection('users').update(
            {
                "iCSN"    : _joinData.iCSN,
                "gamecode": _joinData.gamecode,
                "server"  : _joinData.server
            },
            {$set: _joinData},
            {upsert:true}
        );
        // 접속자 정보를 관리자에 전송 (접속자는 소켓정보가 있고, iCSN이 숫자인 경우만 접속자로 처리)
        userFind(db, {"sid":/^\/#/i, "iCSN":{$regex:/^[0-9]+$/i}}).toArray(function(err, result){
            _callback(result);
            //console.log(result);
        });
    });
};

// 사용자 종료 처리
var outUser = function(_socket, _callback)
{
    MongoClient.connect(mongoUrl, function(err, db)
    {
        assert.equal(null, err);
        var userFindData = {
            sid: _socket.id
        };
        userFind(db, userFindData).toArray(function(err, result)
        {
            var _outUserData = {
                "iCSN"          : result[0].iCSN,
                "gamecode"      : result[0].gamecode,
                "server"        : result[0].server,
                "sid"           : "",
                "disconnectTime": now()
            };
            db.collection('users').update(
                {
                    "iCSN"    : _outUserData.iCSN,
                    "gamecode": _outUserData.gamecode,
                    "server"  : _outUserData.server
                },
                {$set: _outUserData},
                {upsert:true}
            );
            // 사용자 종료 로그
            console.log("out  : ["+_outUserData.iCSN+" / "+_outUserData.gamecode+" / "+_outUserData.server+"]");
            // 관리자인지 체크해서 해당 관리자 소켓 삭제
            adminProc(_outUserData.iCSN, _socket.id, "out");

            _callback(db);
        });
    });
};

// 사용자 접속상태 체크
var sendMsg = function(_data, _isBroad){

    if (sip!==adminIp){
        return;
    }

    MongoClient.connect(mongoUrl, function(err, db)
    {
        assert.equal(null, err);

        if (_isBroad){
            insertMessage(_data);
            io.sockets.emit("userGetMsg", _data.msg);
            return;
        } else {
            var checkOnlineData = {
                iCSN    : _data.iCSN,
                gamecode: _data.gamecode,
                server  : _data.server
            };
        }

        userFind(db, checkOnlineData).toArray(function(err, result)
        {
            try {
                if (result[0].sid != ""){
                    console.log("online  : "+_data.iCSN+" / "+_data.gamecode+" / "+_data.server);
                    io.sockets.connected[result[0].sid].emit("userGetMsg", _data.msg);
                } else {
                    console.log("offline : "+_data.iCSN+" / "+_data.gamecode+" / "+_data.server);
                }
                insertMessage(_data);
            } catch (e){
                console.log("offline : "+_data.iCSN+" / "+_data.gamecode+" / "+_data.server);
                insertMessage(_data);
            }
        });
    });
};

// 발송 메세지 DB 저장
var insertMessage = function(_data)
{
    MongoClient.connect(mongoUrl, function(err, db)
    {
        assert.equal(null, err);
        _data.regDate = now();
        db.collection('messages').insert(_data);
    });
};

// 사용자 메세지 리스트 가져오기
function getMessageLists(_data, _skip, _callback)
{
    MongoClient.connect(mongoUrl, function(err, db)
    {
        assert.equal(null, err);
        var data1 = {
            iCSN    :_data.iCSN,
            gamecode:_data.gamecode,
            server  :_data.server
        };
        var data2 = {
            iCSN:{$exists:false}
        };
        messageListFind(db, {$or : [data1,data2]}, _skip).toArray(function(err, result)
        {
            //console.log(result);
            _callback(result);
        });
    });
}

io.on('connection', function(socket)
{
    // 접속자 IP
    sip = socket.request.socket.remoteAddress.split(":")[socket.request.socket.remoteAddress.split(":").length-1];

    // 클라이언트 접속시 최초 호출
    socket.emit('connUser');
    // 접속 호출을 받은 클라언트에서 접속 처리 요청
    socket.on('joinUser', function(data)
    {
        // users DB에 저장
        joinUser(data, socket, function(_result){
            Object.keys(adminSid).forEach(function (key) {
                //console.log(adminSid[key]);
                io.sockets.connected[adminSid[key]].emit("userList", _result);
            });
        });
        // 최초 접속 시, 기존 메세지 전달
        getMessageLists(data, 0, function(_result){
            //console.log(Object.keys(_result).length);
            //console.log(_result);
            //_result = _result.reverse();
            socket.emit('storedMsg', _result);
        });

    });
    // 메세지 더보기 요청
    socket.on('getOldMsg', function(data){
        console.log("getOldMsg Skip : "+data.skip);

        getMessageLists(data.param, data.skip, function(_result){
            //console.log(Object.keys(_result).length);
            //console.log(_result);
            //_result = _result.reverse();
            socket.emit('storedMsg', _result);
        });
    });
    // 관리자에서 특정 클라이언트에게 메세지 전송
    socket.on('userMsg', function(data){
        sendMsg(data, false);
    });
    // 관리자에서 전체 클라이언트에게 메세지 전송
    socket.on('broadMsg', function(data){
        sendMsg(data, true);
    });
    // 종료 콜백
    socket.on('disconnect', function()
    {
        // 종료 회원 DB 처리
        outUser(socket, function(_db){
            // 접속자 정보를 관리자에 전송 (접속자는 소켓정보가 있고, iCSN이 숫자인 경우만 접속자로 처리)
            userFind(_db, {"sid":/^\/#/i, "iCSN":{$regex:/^[0-9]+$/i}}).toArray(function(err, result){
                Object.keys(adminSid).forEach(function (key) {
                    //console.log(adminSid[key]);
                    io.sockets.connected[adminSid[key]].emit("userList", result);
                });
                //console.log(result);
            });

        });
    });
});

위 로직에

var cluster = require('cluster');
var numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  for (var i=0; i<numCPUs; i++) {
    cluster.fork();
  }

} else {

이 부분만 상단에 추가되었는데, 소켓접속을 계속 시도하는 로그가 찍히네요.

  • 상단에 추가한 cluster관련 코드는 else부분에 원래 로직이 들어가고 }닫는 태그는 잘 닫혀있습니다. Manki Kim 2016.7.27 12:33

1 답변

답변을 하려면 로그인이 필요합니다.

프로그래머스 커뮤니티는 개발자들을 위한 Q&A 서비스입니다. 로그인해야 답변을 작성하실 수 있습니다.

(ಠ_ಠ)
(ಠ‿ಠ)