Commit e16aee0f authored by Xiaohe Cao's avatar Xiaohe Cao Committed by Romain Courteaud

Update websocket-server to use chrome.sockets API.

The chrome.socket API is deprecated since Chrome 33.
parent ccbc1ab8
......@@ -6,13 +6,98 @@
var http = function() {
var socket = (chrome.experimental && chrome.experimental.socket) ||
chrome.socket;
if (!chrome.sockets || !chrome.sockets.tcpServer)
return {};
// Wrap chrome.sockets.tcp socketId with a Promise API.
var PSocket = (function() {
// chrome.sockets.tcp uses a global listener for incoming data so
// use a map to dispatch to the proper instance.
var socketMap = {};
chrome.sockets.tcp.onReceive.addListener(function(info) {
var pSocket = socketMap[info.socketId];
if (pSocket) {
if (pSocket.handlers) {
// Fulfil the pending read.
pSocket.handlers.resolve(info.data);
delete pSocket.handlers;
}
else {
// No pending read so put data on the queue.
pSocket.readQueue.push(info);
}
}
});
// Read errors also use a global listener.
chrome.sockets.tcp.onReceiveError.addListener(function(info) {
var pSocket = socketMap[info.socketId];
if (pSocket) {
if (pSocket.handlers) {
// Reject the pending read.
pSocket.handlers.reject(new Error('chrome.sockets.tcp error ' + info.resultCode));
delete pSocket.handlers;
}
else {
// No pending read so put data on the queue.
pSocket.readQueue.push(info);
}
}
});
// If this does not have chrome.socket, then return an empty http namespace.
if (!socket)
return {};
// PSocket constructor.
return function(socketId) {
this.socketId = socketId;
this.readQueue = [];
// Register this instance for incoming data processing.
socketMap[socketId] = this;
chrome.sockets.tcp.setPaused(socketId, false);
};
})();
// Returns a Promise<ArrayBuffer> with read data.
PSocket.prototype.read = function() {
var that = this;
if (this.readQueue.length) {
// Return data from the queue.
var info = this.readQueue.shift();
if (!info.resultCode)
return Promise.resolve(info.data);
else
return Promise.reject(new Error('chrome.sockets.tcp error ' + info.resultCode));
}
else {
// The queue is empty so install handlers.
return new Promise(function(resolve, reject) {
that.handlers = { resolve: resolve, reject: reject };
});
}
};
// Returns a Promise<integer> with the number of bytes written.
PSocket.prototype.write = function(data) {
var that = this;
return new Promise(function(resolve, reject) {
chrome.sockets.tcp.send(that.socketId, data, function(info) {
if (info && info.resultCode >= 0)
resolve(info.bytesSent);
else
reject(new Error('chrome sockets.tcp error ' + (info && info.resultCode)));
});
});
};
// Returns a Promise.
PSocket.prototype.close = function() {
var that = this;
return new Promise(function(resolve, reject) {
chrome.sockets.tcp.disconnect(that.socketId, function() {
chrome.sockets.tcp.close(that.socketId, resolve);
});
});
};
// Http response code strings.
var responseMap = {
200: 'OK',
......@@ -252,46 +337,42 @@ HttpServer.prototype = {
*/
listen: function(port, opt_host) {
var t = this;
socket.create('tcp', {}, function(socketInfo) {
t.socketInfo_ = socketInfo;
socket.listen(t.socketInfo_.socketId, opt_host || '0.0.0.0', port, 50,
function(result) {
t.readyState_ = 1;
t.acceptConnection_(t.socketInfo_.socketId);
chrome.sockets.tcpServer.create(function(socketInfo) {
chrome.sockets.tcpServer.onAccept.addListener(function(acceptInfo) {
if (acceptInfo.socketId === socketInfo.socketId)
t.readRequestFromSocket_(new PSocket(acceptInfo.clientSocketId));
});
chrome.sockets.tcpServer.listen(
socketInfo.socketId,
opt_host || '0.0.0.0',
port,
50,
function(result) {
if (!result) {
t.readyState_ = 1;
}
else {
console.log(
'listen error ' +
chrome.runtime.lastError.message +
' (normal if another instance is already serving requests)');
}
});
});
},
acceptConnection_: function(socketId) {
var t = this;
socket.accept(this.socketInfo_.socketId, function(acceptInfo) {
t.onConnection_(acceptInfo);
t.acceptConnection_(socketId);
});
},
onConnection_: function(acceptInfo) {
this.readRequestFromSocket_(acceptInfo.socketId);
},
readRequestFromSocket_: function(socketId) {
readRequestFromSocket_: function(pSocket) {
var t = this;
var requestData = '';
var endIndex = 0;
var onDataRead = function(readInfo) {
// Check if connection closed.
if (readInfo.resultCode <= 0) {
socket.disconnect(socketId);
socket.destroy(socketId);
return;
}
requestData += arrayBufferToString(readInfo.data).replace(/\r\n/g, '\n');
var onDataRead = function(data) {
requestData += arrayBufferToString(data).replace(/\r\n/g, '\n');
// Check for end of request.
endIndex = requestData.indexOf('\n\n', endIndex);
if (endIndex == -1) {
endIndex = requestData.length - 1;
socket.read(socketId, onDataRead);
return;
return pSocket.read().then(onDataRead);
}
var headers = requestData.substring(0, endIndex).split('\n');
......@@ -306,10 +387,13 @@ HttpServer.prototype = {
if (requestLine.length == 2)
headerMap[requestLine[0]] = requestLine[1].trim();
}
var request = new HttpRequest(headerMap, socketId);
var request = new HttpRequest(headerMap, pSocket);
t.onRequest_(request);
}
socket.read(socketId, onDataRead);
};
pSocket.read().then(onDataRead).catch(function(e) {
pSocket.close();
});
},
onRequest_: function(request) {
......@@ -318,7 +402,7 @@ HttpServer.prototype = {
if (!this.dispatchEvent(type, request))
request.close();
else if (keepAlive)
this.readRequestFromSocket_(request.socketId_);
this.readRequestFromSocket_(request.pSocket_);
},
};
......@@ -338,15 +422,15 @@ var extensionTypes = {
* Constructs an HttpRequest object which tracks all of the request headers and
* socket for an active Http request.
* @param {Object} headers The HTTP request headers.
* @param {number} socketId The socket Id to use for the response.
* @param {Object} pSocket The socket to use for the response.
* @constructor
*/
function HttpRequest(headers, socketId) {
function HttpRequest(headers, pSocket) {
this.version = 'HTTP/1.1';
this.headers = headers;
this.responseHeaders_ = {};
this.headersSent = false;
this.socketId_ = socketId;
this.pSocket_ = pSocket;
this.writes_ = 0;
this.bytesRemaining = 0;
this.finished_ = false;
......@@ -362,11 +446,10 @@ HttpRequest.prototype = {
close: function() {
// The socket for keep alive connections will be re-used by the server.
// Just stop referencing or using the socket in this HttpRequest.
if (this.headers['Connection'] != 'keep-alive') {
socket.disconnect(this.socketId_);
socket.destroy(this.socketId_);
}
this.socketId_ = 0;
if (this.headers['Connection'] != 'keep-alive')
pSocket.close();
this.pSocket_ = null;
this.readyState = 3;
},
......@@ -467,13 +550,12 @@ HttpRequest.prototype = {
write_: function(array) {
var t = this;
this.bytesRemaining += array.byteLength;
socket.write(this.socketId_, array, function(writeInfo) {
if (writeInfo.bytesWritten < 0) {
console.error('Error writing to socket, code '+writeInfo.bytesWritten);
return;
}
t.bytesRemaining -= writeInfo.bytesWritten;
this.pSocket_.write(array).then(function(bytesWritten) {
t.bytesRemaining -= bytesWritten;
t.checkFinished_();
}).catch(function(e) {
console.error(e.message);
return;
});
},
......@@ -505,7 +587,7 @@ WebSocketServer.prototype = {
}
if (this.dispatchEvent('request', new WebSocketRequest(request))) {
if (request.socketId_)
if (request.pSocket_)
request.reject();
return true;
}
......@@ -522,8 +604,8 @@ WebSocketServer.prototype = {
*/
function WebSocketRequest(httpRequest) {
// We'll assume control of the socket for this request.
HttpRequest.apply(this, [httpRequest.headers, httpRequest.socketId_]);
httpRequest.socketId_ = 0;
HttpRequest.apply(this, [httpRequest.headers, httpRequest.pSocket_]);
httpRequest.pSocket_ = null;
}
WebSocketRequest.prototype = {
......@@ -568,9 +650,9 @@ WebSocketRequest.prototype = {
if (this.headers['Sec-WebSocket-Protocol'])
responseHeader['Sec-WebSocket-Protocol'] = this.headers['Sec-WebSocket-Protocol'];
this.writeHead(101, responseHeader);
var socket = new WebSocketServerSocket(this.socketId_);
var socket = new WebSocketServerSocket(this.pSocket_);
// Detach the socket so that we don't use it anymore.
this.socketId_ = 0;
this.pSocket_ = 0;
return socket;
},
......@@ -587,8 +669,9 @@ WebSocketRequest.prototype = {
* a socket which has already been upgraded from an Http request.
* @param {number} socketId The socket id with an active websocket connection.
*/
function WebSocketServerSocket(socketId) {
this.socketId_ = socketId;
function WebSocketServerSocket(pSocket) {
this.pSocket_ = pSocket;
this.readyState = 1;
EventSource.apply(this);
this.readFromSocket_();
}
......@@ -616,8 +699,10 @@ WebSocketServerSocket.prototype = {
* process.
*/
close: function() {
this.sendFrame_(8);
this.readyState = 2;
if (this.readyState === 1) {
this.sendFrame_(8);
this.readyState = 2;
}
},
readFromSocket_: function() {
......@@ -627,17 +712,8 @@ WebSocketServerSocket.prototype = {
var fragmentedOp = 0;
var fragmentedMessages = [];
var onDataRead = function(readInfo) {
if (readInfo.resultCode <= 0) {
t.close_();
return;
}
if (!readInfo.data.byteLength) {
socket.read(t.socketId_, onDataRead);
return;
}
var a = new Uint8Array(readInfo.data);
var onDataRead = function(dataBuffer) {
var a = new Uint8Array(dataBuffer);
for (var i = 0; i < a.length; i++)
data.push(a[i]);
......@@ -708,9 +784,15 @@ WebSocketServerSocket.prototype = {
break; // Insufficient data, wait for more.
}
}
socket.read(t.socketId_, onDataRead);
return t.pSocket_.read().then(onDataRead);
};
socket.read(this.socketId_, onDataRead);
this.pSocket_.read().then(function(data) {
return onDataRead(data);
}).catch(function(e) {
t.close_();
});
},
onFrame_: function(op, data) {
......@@ -727,8 +809,9 @@ WebSocketServerSocket.prototype = {
this.dispatchEvent('message', {'data': data});
} else if (op == 8) {
// A close message must be confirmed before the websocket is closed.
if (this.readyState == 1) {
if (this.readyState === 1) {
this.sendFrame_(8);
this.readyState = 2;
} else {
this.close_();
return false;
......@@ -780,19 +863,20 @@ WebSocketServerSocket.prototype = {
return buffer;
}
var array = WebsocketFrameData(op, data || '');
socket.write(this.socketId_, array, function(writeInfo) {
if (writeInfo.resultCode < 0 ||
writeInfo.bytesWritten !== array.byteLength) {
t.close_();
}
this.pSocket_.write(array).then(function(bytesWritten) {
if (bytesWritten !== array.byteLength)
throw new Error('insufficient write');
}).catch(function(e) {
t.close_();
});
},
close_: function() {
chrome.socket.disconnect(this.socketId_);
chrome.socket.destroy(this.socketId_);
this.readyState = 3;
this.dispatchEvent('close');
if (this.readyState !== 3) {
this.pSocket_.close();
this.readyState = 3;
this.dispatchEvent('close');
}
}
};
......
......@@ -64,12 +64,12 @@ if (http.Server && http.WebSocketServer) {
wsServer.addEventListener('request', function(req) {
var socket = req.accept();
socket_dict[socket.socketId_] = {
socket_dict[socket.pSocket_.socketId] = {
socket: socket
};
chrome.socket.getInfo(socket.socketId_, function (result) {
chrome.sockets.tcp.getInfo(socket.pSocket_.socketId, function (result) {
if ((result !== undefined) && (result.connected)) {
socket_dict[socket.socketId_].peer = result.peerAddress + ':' + result.peerPort;
socket_dict[socket.pSocket_.socketId].peer = result.peerAddress + ':' + result.peerPort;
updateStatus(socket_dict);
}
});
......@@ -87,7 +87,7 @@ if (http.Server && http.WebSocketServer) {
// When a socket is closed, remove it from the list of connected sockets.
socket.addEventListener('close', function() {
delete socket_dict[socket.socketId_];
delete socket_dict[socket.pSocket_.socketId];
updateStatus(socket_dict);
});
return true;
......
......@@ -6,10 +6,17 @@
"minimum_chrome_version": "44",
"offline_enabled": true,
"manifest_version": 2,
"permissions": ["system.network", {
"socket": [
"tcp-connect",
"tcp-listen"]}],
"permissions": ["system.network"],
"sockets": {
"tcp": {
"connect": "*"
},
"tcpServer": {
"listen": "*"
}
},
"app": {
"background": {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment