IO.js Cluster

2018-11-28 22:33 更新

穩(wěn)定度: 2 - 穩(wěn)定

單個的io.js實例運行在單線程上。為了享受多核系統(tǒng)的優(yōu)勢,用戶需要啟動一個io.js集群來處理負載。

cluster模塊允許你方便地創(chuàng)建共享服務器端口的子進程:

var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  // Fork workers.
  for (var i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('exit', function(worker, code, signal) {
    console.log('worker ' + worker.process.pid + ' died');
  });
} else {
  // Workers can share any TCP connection
  // In this case its a HTTP server
  http.createServer(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");
  }).listen(8000);
}

啟動io.js將會在工作線程中共享8000端口:

% NODE_DEBUG=cluster iojs server.js
23521,Master Worker 23524 online
23521,Master Worker 23526 online
23521,Master Worker 23523 online
23521,Master Worker 23528 online

這個特性是最近才開發(fā)的,并且可能在未來有所改變。請試用它并提供反饋。

注意,在Windows中,在工作進程中建立命名管道服務器目前是不可行的。

工作原理

工作進程通過child_process.fork方法被創(chuàng)建,所以它們可以與父進程通過IPC管道溝通以及相互傳遞服務器句柄。

集群模式支持兩種分配傳入連接的方式。

第一種(并且是除了Windows平臺外默認的方式)是循環(huán)式。主進程監(jiān)聽一個端口,接受新連接,并且以輪流的方式分配給工作進程,并且以一些內建機制來避免一個工作進程過載。

第二種方式是,主進程建立監(jiān)聽socket并且將它發(fā)送給感興趣的工作進程。工作進程直接接受傳入的連接。

第二種方式理論上有最好的性能。但是在實踐中,操作系統(tǒng)的調度不可預測,分配往往十分不平衡。負載曾被觀察到8個進程中,超過70%的連接結束于其中的2個進程。

因為server.listen()將大部分工作交給了主進程,所以一個普通的io.js進程和一個集群工作進程會在三種情況下有所區(qū)別:

  1. server.listen({fd: 7}) 因為消息被傳遞給了主進程,主進程的文件描述符7會被監(jiān)聽,并且句柄會被傳遞給工作進程而不是監(jiān)聽工作進程中文件描述符為7的東西。

  2. server.listen(handle) 明確地監(jiān)聽句柄,會讓工作進程使用給定的句柄,而不是與主進程通信。如果工作進程已經有了此句柄,那么將假設你知道你在做什么。

  3. server.listen(0) 通常,這會導致服務器監(jiān)聽一個隨機端口。但是,在集群中,每次調用listen(0)時,每一個工作進程會收到同樣的“隨機”端口。也就是說,端口只是在第一次方法被調用時是隨機的,但在之后是可預知的。如果你想監(jiān)聽特定的端口,則根據(jù)工作進程的PID來生成端口號。

由于在io.js或你的程序中的工作進程間沒有路由邏輯也沒有共享的狀態(tài)。所以,請不要為你的程序設計成依賴太重的內存數(shù)據(jù)對象,如設計會話和登陸時。

因為工作進程都是獨立的進程,它們可以根據(jù)你程序的需要被殺死或被創(chuàng)建,并且并不會影響到其他工作進程。只要有活躍的工作進程,那么服務器就會繼續(xù)接收連接。但是io.js不會自動地為你管理工作進程數(shù)。所以根據(jù)你的應用需求來管理工作進程池是你的責任。

cluster.schedulingPolicy

調度策略,選擇cluster.SCHED_RR來使用循環(huán)式,或選擇cluster.SCHED_NONE來由操作系統(tǒng)處理。這是一個全局設定,并且在你第一次啟動了一個工作進程或調用cluster.setupMaster()方法后就不可再更改。

SCHED_RR是除了Windows外其他操作系統(tǒng)中的默認值。一旦libuv能夠有效地分配IOCP句柄并且沒有巨大的性能損失,那么Windows下的默認值也會變?yōu)樗?/p>

cluster.schedulingPolicy也可以通過環(huán)境變量NODE_CLUSTER_SCHED_POLICY來設定。合法值為rrnone

cluster.settings

  • Object

  • execArgv Array 傳遞給io.js執(zhí)行的字符串參數(shù)(默認為process.execArgv
  • exec String 工作進程文件的路徑(默認為process.argv[1]
  • args Array 傳遞給工作進程的字符串參數(shù)(默認為process.argv.slice(2)
  • silent Boolean 是否將工作進程的輸出傳遞給父進程的stdio(默認為false
  • uid Number 設置用戶進程的ID
  • gid Number 設置進程組的ID

在調用.setupMaster()(或.fork())方法之后,這個settings對象會存放方法的配置,包括默認值。

因為.setupMaster()僅能被調用一次,所以這個對象被設置后便不可更改。

這個對象不應由你來手工更改或設置。

cluster.isMaster

  • Boolean

如果進程是主進程則返回true。這由process.env.NODE_UNIQUE_ID決定。如果process.env.NODE_UNIQUE_IDundefined,那么就返回true。

cluster.isWorker

  • Boolean

如果進程不是主進程則返回true

Event: 'fork'

  • worker Worker object

當一個新的工作進程由cluster模塊所開啟時會觸發(fā)fork事件。這能被用來記錄工作進程活動日志,或創(chuàng)建自定義的超時。

var timeouts = [];
function errorMsg() {
  console.error("Something must be wrong with the connection ...");
}

cluster.on('fork', function(worker) {
  timeouts[worker.id] = setTimeout(errorMsg, 2000);
});
cluster.on('listening', function(worker, address) {
  clearTimeout(timeouts[worker.id]);
});
cluster.on('exit', function(worker, code, signal) {
  clearTimeout(timeouts[worker.id]);
  errorMsg();
});

Event: 'online'

  • worker Worker object

當創(chuàng)建了一個新的工作線程后,工作線程必須響應一個在線信息。當主進程接收到在線信息后它會觸發(fā)這個事件。forkonline事件的區(qū)別在于:fork是主進程創(chuàng)建了工作進程后觸發(fā),online是工作進程開始運行時觸發(fā)。

cluster.on('online', function(worker) {
  console.log("Yay, the worker responded after it was forked");
});

Event: 'listening'

  • worker Worker object
  • address Object

當工作進程調用listen()方法。服務器會觸發(fā)listening事件,集群中的主進程也會觸發(fā)一個listening事件。

這個事件的回調函數(shù)包含兩個參數(shù),第一個worker是一個包含工作進程的對象,address對象是一個包含以下屬性的對象:address,portaddressType。當工作進程監(jiān)聽多個地址時,這非常有用。

cluster.on('listening', function(worker, address) {
  console.log("A worker is now connected to " + address.address + ":" + address.port);
});

addressType是以下中的一個:

  • 4 (TCPv4)
  • 6 (TCPv6)
  • -1 (unix domain socket)
  • "udp4" 或 "udp6" (UDP v4 或 v6)

Event: 'disconnect'

  • worker Worker object

當工作進程的IPC信道斷開連接時觸發(fā)。這個事件當工作進程優(yōu)雅地退出,被殺死,或手工斷開連接(如調用worker.disconnect())后觸發(fā)。

disconnectexit事件之間可能存在延遲。這兩個事件可以用來偵測是否進程在清理的過程中被阻塞,或者是否存在長連接。

cluster.on('disconnect', function(worker) {
  console.log('The worker #' + worker.id + ' has disconnected');
});

Event: 'exit'

  • worker Worker object
  • code Number 如果正常退出,則為退出碼
  • signal String 導致進程被殺死的信號的信號名(如'SIGHUP')

當任何一個工作進程結束時,cluster模塊會觸發(fā)一個exit事件。

這可以被用來通過再次調用.fork()方法重啟服務器。

cluster.on('exit', function(worker, code, signal) {
  console.log('worker %d died (%s). restarting...',
    worker.process.pid, signal || code);
  cluster.fork();
});

參閱child_process事件:exit

Event: 'setup'

  • settings Object

每次.setupMaster()方法被調用時觸發(fā)。

這個settings對象與.setupMaster()被調用時cluster.settings對象相同,并且僅供查詢,因為.setupMaster()可能在一次事件循環(huán)里被調用多次。

如果保持精確十分重要,請使用cluster.settings。

cluster.setupMaster([settings])

  • settings Object

  • exec String 工作進程文件的路徑(默認為process.argv[1]
  • args Array 傳遞給工作進程的參數(shù)字符串(默認為process.argv.slice(2)
  • silent Boolean 是否將工作進程的輸出傳遞給父進程的stdio(默認為false)

setupMaster方法被用來改變默認的fork行為。一旦被調用,settings參數(shù)將被表現(xiàn)為cluster.settings

注意:

  • 任何settings的改變僅影響之后的.fork()調用,而不影響已經運行中的工作進程
  • 工作進程中唯一不同通過.setupMaster()來設置的屬性是傳遞給.fork()方法的env參數(shù)
  • 上文中的參數(shù)的默認值僅在第一次調用時被應用,之后的調用的默認值是當前cluster.setupMaster()被調用時的值。

例子:

var cluster = require('cluster');
cluster.setupMaster({
  exec: 'worker.js',
  args: ['--use', 'https'],
  silent: true
});
cluster.fork(); // https worker
cluster.setupMaster({
  args: ['--use', 'http']
});
cluster.fork(); // http worker

這只能被主進程調用。

cluster.fork([env])

  • env Object 將添加到工作進程環(huán)境變量的鍵值對
  • return Worker object

創(chuàng)建一個新的工作進程。

這只能被主進程調用。

cluster.disconnect([callback])

  • callback Function 當所有的工作進程斷開連接并且所有句柄關閉后調用

cluster.workers中的每一個工作進程中調用.disconnect()。

當所有進程斷開連接,所有內部的句柄都將關閉,如果沒有其他的事件處于等待,將允許主進程優(yōu)雅地退出。

這個方法接受一個可選的將會在結束時觸發(fā)的回調函數(shù)參數(shù)。

這只能被主進程調用。

cluster.worker

  • Object

當前工作進程對象的引用。對于主進程不可用。

var cluster = require('cluster');

if (cluster.isMaster) {
  console.log('I am master');
  cluster.fork();
  cluster.fork();
} else if (cluster.isWorker) {
  console.log('I am worker #' + cluster.worker.id);
}

cluster.workers

  • Object

一個儲存了所有活躍的工作進程對象的哈希表,以id字段為主鍵。這使得遍歷所有工作進程變得容易。僅在主進程中可用。

當工作進程斷開連接或退出時,它會從cluster.workers中移除。這個兩個事件的觸發(fā)順序不能被提前決定。但是,能保證的是,從cluster.workers移除一定發(fā)生在這兩個事件觸發(fā)之后。

// Go through all workers
function eachWorker(callback) {
  for (var id in cluster.workers) {
    callback(cluster.workers[id]);
  }
}
eachWorker(function(worker) {
  worker.send('big announcement to all workers');
});

想要跨越通信信道來得到一個工作進程的引用時,使用工作進程的唯一id能簡單找到工作進程。

socket.on('data', function(id) {
  var worker = cluster.workers[id];
});

Class: Worker

Worker對象包含了一個工作進程所有的公開信息和方法。在主進程中它可以通過cluster.workers取得。在工作進程中它可以通過cluster.worker取得。

worker.id

  • String

每一個新的工作進程都被給予一個獨一無二的id,這個id被存儲在此id屬性中。

當一個工作進程活躍時,這是它被索引在cluster.workers中的主鍵。

worker.process

  • ChildProcess object

所有的工作進程都通過child_process.fork()被創(chuàng)建,返回的對象被作為.process屬性存儲。在一個工作進程中,全局的process被存儲。

參閱Child Process module

注意,如果在進程中disconnect事件觸發(fā)并且.suicide屬性不為true,那么進程會調用process.exit(0)。這防止了意外的斷開連接。

worker.suicide

  • Boolean

通過調用.kill().disconnect()設置,在這之前他為undefined。

布爾值worker.suicide使你可以區(qū)別自發(fā)和意外的退出,主進程可以通過這個值來決定使用重新創(chuàng)建一個工作進程。

cluster.on('exit', function(worker, code, signal) {
  if (worker.suicide === true) {
    console.log('Oh, it was just suicide\' – no need to worry').
  }
});

// kill worker
worker.kill();

worker.send(message[, sendHandle])

  • message Object
  • sendHandle Handle object

給工作進程或主進程發(fā)生一個信息,可選得添加一個句柄。

在主進程中它將給特定的工作進程發(fā)送一個信息。它指向child.send()

在工作進程中它將給主進程發(fā)送一個信息。它指向process.send()。

下面的例子將來自主進程的所有信息返回:

if (cluster.isMaster) {
  var worker = cluster.fork();
  worker.send('hi there');

} else if (cluster.isWorker) {
  process.on('message', function(msg) {
    process.send(msg);
  });
}

worker.kill([signal='SIGTERM'])

  • signal String 傳遞給工作進程的結束信號名

這個函數(shù)將會殺死工作進程。在主進程中,它通過斷開worker.process做到,并且一旦斷開,使用signal殺死進程。在工作進程中,它通過斷開信道做到,然后使用退出碼0退出。

會導致.suicide被設置。

為了向后兼任,這個方法的別名是worker.destroy()。

注意在工作進程中,process.kill()存在,但它不是這個函數(shù)。是process.kill(pid[, signal])。

worker.disconnect()

在工作進程中,這個函數(shù)會關閉所有的服務器,等待這些服務器上的close事件,然后斷開IPC信道。

在主進程中,一個內部信息會被傳遞給工作進程,至使它們自行調用.disconnect()

會導致.suicide被設置。

注意在一個服務器被關閉后,它將不會再接受新連接,但是連接可能被其他正在監(jiān)聽的工作進程所接收。已存在的連接將會被允許向往常一樣退出。當沒有更多的連接存在時,工作進程的IPC信道會關閉并使之優(yōu)雅地退出,參閱server.close()。

以上說明僅應用于服務器連接,客戶端連接將不會自動由工作進程關閉,并且在退出前,不會等到連接退出。

注意在工作進程中,process.disconnect存在,但它不是這個函數(shù)。是child.disconnect()。

由于長連接可能會阻塞工作進程的退出,這時傳遞一個動作信息非常有用,應用來根據(jù)信息指定的動作來關閉它們。超時機制是上述的有用實現(xiàn),在disconnect事件在指定時長后沒有觸發(fā)時,殺死工作進程。

if (cluster.isMaster) {
  var worker = cluster.fork();
  var timeout;

  worker.on('listening', function(address) {
    worker.send('shutdown');
    worker.disconnect();
    timeout = setTimeout(function() {
      worker.kill();
    }, 2000);
  });

  worker.on('disconnect', function() {
    clearTimeout(timeout);
  });

} else if (cluster.isWorker) {
  var net = require('net');
  var server = net.createServer(function(socket) {
    // connections never end
  });

  server.listen(8000);

  process.on('message', function(msg) {
    if(msg === 'shutdown') {
      // initiate graceful close of any connections to server
    }
  });
}

worker.isDead()

如果工作進程已經被關閉,則返回true

worker.isConnected()

如果工作進程通過它的IPC信道連接到主進程,則返回true。一個工作進程在被創(chuàng)建后連接到它的主進程。在disconnect事件觸發(fā)后它會斷開連接。

Event: 'message'

  • message Object

這個事件與child_process.fork()所提供的事件完全相同。

在工作進程中你也可以使用process.on('message')

例子,這里有一個集群,使用消息系統(tǒng)在主進程中統(tǒng)計請求的數(shù)量:

var cluster = require('cluster');
var http = require('http');

if (cluster.isMaster) {

  // Keep track of http requests
  var numReqs = 0;
  setInterval(function() {
    console.log("numReqs =", numReqs);
  }, 1000);

  // Count requestes
  function messageHandler(msg) {
    if (msg.cmd && msg.cmd == 'notifyRequest') {
      numReqs += 1;
    }
  }

  // Start workers and listen for messages containing notifyRequest
  var numCPUs = require('os').cpus().length;
  for (var i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  Object.keys(cluster.workers).forEach(function(id) {
    cluster.workers[id].on('message', messageHandler);
  });

} else {

  // Worker processes have a http server.
  http.Server(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");

    // notify master about the request
    process.send({ cmd: 'notifyRequest' });
  }).listen(8000);
}

Event: 'online'

cluster.on('online')事件相似,但指向了特定的工作進程。

cluster.fork().on('online', function() {
  // Worker is online
});

這不是在工作進程中觸發(fā)的。

Event: 'listening'

  • address Object

cluster.on('listening')事件相似,但指向了特定的工作進程。

cluster.fork().on('listening', function(address) {
  // Worker is listening
});

這不是在工作進程中觸發(fā)的。

Event: 'disconnect'

cluster.on('disconnect')事件相似,但指向了特定的工作進程。

cluster.fork().on('disconnect', function() {
  // Worker has disconnected
});

Event: 'exit'

  • code Number 如果正常退出,則為退出碼
  • signal String 導致進程被殺死的信號名(如'SIGHUP'

cluster.on('exit')事件相似,但指向了特定的工作進程。

var worker = cluster.fork();
worker.on('exit', function(code, signal) {
  if( signal ) {
    console.log("worker was killed by signal: "+signal);
  } else if( code !== 0 ) {
    console.log("worker exited with error code: "+code);
  } else {
    console.log("worker success!");
  }
});

Event: 'error'

這個事件與child_process.fork()所提供的事件完全相同。

在工作進程中你也可以使用process.on('error')

以上內容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號