Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit sentFrame and receivedFrame events from endpoint. #135

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 38 additions & 4 deletions lib/http.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
// - **Event: 'connection' (socket, [endpoint])**: there's a second argument if the negotiation of
// HTTP/2 was successful: the reference to the [Endpoint](endpoint.html) object tied to the
// socket.
// - **Event: 'endpoint' (endpoint)**: the endpoint created by the new connection. Emitted
// before any data has been transmitted.
//
// - **http2.createServer(options, [requestListener])**: additional option:
// - **log**: an optional [bunyan](https://github.com/trentm/node-bunyan) logger object
Expand All @@ -38,13 +40,16 @@
// - **http2.request(options, [callback])**: additional option:
// - **plain**: if `true`, the client will not try to build a TLS tunnel, instead it will use
// the raw TCP stream for HTTP/2
// - **onEndpoint**: A event handler for the endpoint event.
//
// - **Class: http2.ClientRequest**
// - **Event: 'socket' (socket)**: in case of an HTTP/2 incoming message, `socket` is a reference
// to the associated [HTTP/2 Stream](stream.html) object (and not to the TCP socket).
// - **Event: 'push' (promise)**: signals the intention of a server push associated to this
// request. `promise` is an IncomingPromise. If there's no listener for this event, the server
// push is cancelled.
// - **Event: 'endpoint'**: Emited when the endpoint for the request has been created. If the
// request is reusing a connection, the existing Endpoint is emitted.
// - **request.setPriority(priority)**: assign a priority to this request. `priority` is a number
// between 0 (highest priority) and 2^31-1 (lowest priority). Default value is 2^30.
//
Expand Down Expand Up @@ -446,7 +451,15 @@ Server.prototype = Object.create(EventEmitter.prototype, { constructor: { value:

// Starting HTTP/2
Server.prototype._start = function _start(socket) {
var endpoint = new Endpoint(this._log, 'SERVER', this._settings);
var self = this;
var endpoint = new Endpoint({
log: this._log,
role: 'SERVER',
settings: this._settings,
onEndpoint: function (endpoint) {
self.emit('endpoint', endpoint);
}
});

this._log.info({ e: endpoint,
client: socket.remoteAddress + ':' + socket.remotePort,
Expand Down Expand Up @@ -841,6 +854,11 @@ Agent.prototype.request = function request(options, callback) {

var request = new OutgoingRequest(this._log);

// * dealing with endpoint event handler
if (typeof options.onEndpoint === 'function') {
request.addListener('endpoint', options.onEndpoint);
}

if (callback) {
request.on('response', callback);
}
Expand All @@ -854,17 +872,27 @@ Agent.prototype.request = function request(options, callback) {
// * There's an existing HTTP/2 connection to this host
if (key in this.endpoints) {
var endpoint = this.endpoints[key];
this.emit('endpoint', endpoint);

request._start(endpoint.createStream(), options);
}

// * HTTP/2 over plain TCP
else if (options.plain) {
endpoint = new Endpoint(this._log, 'CLIENT', this._settings);
endpoint = new Endpoint({
log: this._log,
role: 'CLIENT',
settings: this._settings,
onEndpoint: options.onEndpoint
});

endpoint.socket = net.connect({
host: options.host,
port: options.port,
localAddress: options.localAddress
});
this.endpoints[key] = endpoint;

endpoint.pipe(endpoint.socket).pipe(endpoint);
request._start(endpoint.createStream(), options);
}
Expand All @@ -879,7 +907,7 @@ Agent.prototype.request = function request(options, callback) {
options.ciphers = options.ciphers || cipherSuites;
var httpsRequest = https.request(options);

httpsRequest.on('socket', function(socket) {
httpsRequest.on('socket', function (socket) {
var negotiatedProtocol = socket.alpnProtocol || socket.npnProtocol;
if (negotiatedProtocol != null) { // null in >=0.11.0, undefined in <0.11.0
negotiated();
Expand All @@ -895,7 +923,13 @@ Agent.prototype.request = function request(options, callback) {
if (negotiatedProtocol === protocol.VERSION) {
httpsRequest.socket.emit('agentRemove');
unbundleSocket(httpsRequest.socket);
endpoint = new Endpoint(self._log, 'CLIENT', self._settings);
endpoint = new Endpoint({
log: self._log,
role: 'CLIENT',
settings: self._settings,
onEndpoint: options.onEndpoint
});

endpoint.socket = httpsRequest.socket;
endpoint.pipe(endpoint.socket).pipe(endpoint);
}
Expand Down
46 changes: 37 additions & 9 deletions lib/protocol/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@ exports.Connection = Connection;
// Public API
// ----------

// * **new Connection(log, firstStreamId, settings)**: create a new Connection
// * **new Connection(config)**: create a new Connection
// - `config.log`: bunyan logger of the parent
// - `config.firstStreamId`: the ID of the first outbound stream
// - `config.settings`: initial HTTP/2 settings
// - `config.onSentFrame`: Event handler for sentFrame event
// - `config.onReceivedFrame`: Event handler for receivedFrame event
//
// * **Event: 'error' (type)**: signals a connection level error made by the other end
//
Expand All @@ -23,6 +28,10 @@ exports.Connection = Connection;
//
// * **Event: 'stream' (stream)**: signals that there's an incoming stream
//
// * **Event: 'sentFrame' (frame)**: signals a frame has been sent to the remote.
//
// * **Event: 'receivedFrame' (frame)**: signals a frame has been received from the remote.
//
// * **createStream(): stream**: initiate a new stream
//
// * **set(settings, callback)**: change the value of one or more settings according to the
Expand All @@ -36,15 +45,27 @@ exports.Connection = Connection;
// -----------

// The main aspects of managing the connection are:
function Connection(log, firstStreamId, settings) {
function Connection(config) {
// * initializing the base class
Flow.call(this, 0);

// * save settings object
this._config = config || {};

// * attach frame events
if (typeof this._config.onSentFrame === 'function') {
this.addListener('sentFrame', this._config.onSentFrame);
}

if (typeof this._config.onReceivedFrame === 'function') {
this.addListener('receivedFrame', this._config.onReceivedFrame);
}

// * logging: every method uses the common logger object
this._log = log.child({ component: 'connection' });
this._log = this._config.log.child({component: 'connection'});

// * stream management
this._initializeStreamManagement(firstStreamId);
this._initializeStreamManagement(this._config.firstStreamId);

// * lifecycle management
this._initializeLifecycleManagement();
Expand All @@ -53,7 +74,7 @@ function Connection(log, firstStreamId, settings) {
this._initializeFlowControl();

// * settings management
this._initializeSettingsManagement(settings);
this._initializeSettingsManagement(this._config.connectionSettings);

// * multiplexing
this._initializeMultiplexing();
Expand Down Expand Up @@ -308,6 +329,8 @@ priority_loop:
continue;
}

this.emit('sentFrame', frame);

nextBucket.push(stream);

if (frame.stream === undefined) {
Expand Down Expand Up @@ -371,6 +394,7 @@ Connection.prototype._receive = function _receive(frame, done) {
// * and writes it to the `stream`'s `upstream`
stream.upstream.write(frame);

this.emit('receivedFrame', frame);
done();
};

Expand Down Expand Up @@ -451,13 +475,17 @@ Connection.prototype.set = function set(settings, callback) {
}
});

// * Sending out the SETTINGS frame
this.push({
var settingsFrame = {
type: 'SETTINGS',
flags: { ACK: false },
flags: {ACK: false},
stream: 0,
settings: settings
});
};

this.emit('sentFrame', settingsFrame);

// * Sending out the SETTINGS frame
this.push(settingsFrame);
for (var name in settings) {
this.emit('SENDING_' + name, settings[name]);
}
Expand Down
48 changes: 37 additions & 11 deletions lib/protocol/endpoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ exports.Endpoint = Endpoint;
// Public API
// ----------

// - **new Endpoint(log, role, settings, filters)**: create a new Endpoint.
// - **new Endpoint(config)**: create a new Endpoint.
//
// - `log`: bunyan logger of the parent
// - `role`: 'CLIENT' or 'SERVER'
// - `settings`: initial HTTP/2 settings
// - `filters`: a map of functions that filter the traffic between components (for debugging or
// - `config.log`: bunyan logger of the parent
// - `config.role`: 'CLIENT' or 'SERVER'
// - `config.settings`: initial HTTP/2 settings
// - `config.filters`: a map of functions that filter the traffic between components (for debugging or
// intentional failure injection).
//
// Filter functions get three arguments:
Expand All @@ -37,6 +37,11 @@ exports.Endpoint = Endpoint;
//
// * **Event: 'stream' (Stream)**: 'stream' event forwarded from the underlying Connection
//
// * **Event: 'sentFrame' (frame)**: signals a frame has been sent to the remote. Raised from underlying connection.
//
// * **Event: 'receivedFrame' (frame)**: signals a frame has been received to the remote. Raised from
// underlying connection.
//
// * **Event: 'error' (type)**: signals an error
//
// * **createStream(): Stream**: initiate a new stream (forwarded to the underlying Connection)
Expand All @@ -47,16 +52,25 @@ exports.Endpoint = Endpoint;
// -----------

// The process of initialization:
function Endpoint(log, role, settings, filters) {
function Endpoint(config) {
Duplex.call(this);

this._config = config || {};

// * Handle onEndpoint
if (typeof this._config.onEndpoint === 'function') {
this.addListener('endpoint', this._config.onEndpoint);
}

this.emit('endpoint', this);

// * Initializing logging infrastructure
this._log = log.child({ component: 'endpoint', e: this });
this._log = this._config.log.child({component: 'endpoint', e: this});

// * First part of the handshake process: sending and receiving the client connection header
// prelude.
assert((role === 'CLIENT') || role === 'SERVER');
if (role === 'CLIENT') {
assert((this._config.role === 'CLIENT') || this._config.role === 'SERVER');
if (this._config.role === 'CLIENT') {
this._writePrelude();
} else {
this._readPrelude();
Expand All @@ -65,7 +79,7 @@ function Endpoint(log, role, settings, filters) {
// * Initialization of component. This includes the second part of the handshake process:
// sending the first SETTINGS frame. This is done by the connection class right after
// initialization.
this._initializeDataFlow(role, settings, filters || {});
this._initializeDataFlow(this._config.role, this._config.settings, this._config.filters || {});

// * Initialization of management code.
this._initializeManagement();
Expand Down Expand Up @@ -169,6 +183,8 @@ function pipeAndFilter(stream1, stream2, filter) {

Endpoint.prototype._initializeDataFlow = function _initializeDataFlow(role, settings, filters) {
var firstStreamId, compressorRole, decompressorRole;
var self = this;

if (role === 'CLIENT') {
firstStreamId = 1;
compressorRole = 'REQUEST';
Expand All @@ -183,7 +199,17 @@ Endpoint.prototype._initializeDataFlow = function _initializeDataFlow(role, sett
this._deserializer = new Deserializer(this._log);
this._compressor = new Compressor(this._log, compressorRole);
this._decompressor = new Decompressor(this._log, decompressorRole);
this._connection = new Connection(this._log, firstStreamId, settings);
this._connection = new Connection({
log: this._log,
firstStreamId: firstStreamId,
connectionSettings: settings,
onSentFrame: function (frame) {
self.emit('sentFrame', frame);
},
onReceivedFrame: function (frame) {
self.emit('receivedFrame', frame);
}
});

pipeAndFilter(this._connection, this._compressor, filters.beforeCompression);
pipeAndFilter(this._compressor, this._serializer, filters.beforeSerialization);
Expand Down
6 changes: 3 additions & 3 deletions test/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ describe('connection.js', function() {
describe('invalid operation', function() {
describe('unsolicited ping answer', function() {
it('should be ignored', function() {
var connection = new Connection(util.log, 1, settings);
var connection = new Connection({log: util.log, firstStreamId: 1, connectionSettings: settings});

connection._receivePing({
stream: 0,
Expand All @@ -82,8 +82,8 @@ describe('connection.js', function() {
describe('test scenario', function() {
var c, s;
beforeEach(function() {
c = new Connection(util.log.child({ role: 'client' }), 1, settings);
s = new Connection(util.log.child({ role: 'client' }), 2, settings);
c = new Connection({log: util.log.child({ role: 'client' }), firstStreamId: 1, connectionSettings: settings});
s = new Connection({log: util.log.child({ role: 'client' }), firstStreamId: 2, connectionSettings: settings});
c.pipe(s).pipe(c);
});

Expand Down
24 changes: 20 additions & 4 deletions test/endpoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,16 @@ describe('endpoint.js', function() {
describe('scenario', function() {
describe('connection setup', function() {
it('should work as expected', function(done) {
var c = new Endpoint(util.log.child({ role: 'client' }), 'CLIENT', settings);
var s = new Endpoint(util.log.child({ role: 'client' }), 'SERVER', settings);
var c = new Endpoint({
log: util.log.child({role: 'client'}),
role: 'CLIENT',
settings: settings
});
var s = new Endpoint({
log: util.log.child({role: 'client'}),
role: 'SERVER',
settings: settings
});

util.log.debug('Test initialization over, starting piping.');
c.pipe(s).pipe(c);
Expand All @@ -30,8 +38,16 @@ describe('endpoint.js', function() {
describe('`e`', function() {
var format = endpoint.serializers.e;
it('should assign a unique ID to each endpoint', function() {
var c = new Endpoint(util.log.child({ role: 'client' }), 'CLIENT', settings);
var s = new Endpoint(util.log.child({ role: 'client' }), 'SERVER', settings);
var c = new Endpoint({
log: util.log.child({role: 'client'}),
role: 'CLIENT',
settings: settings
});
var s = new Endpoint({
log: util.log.child({role: 'client'}),
role: 'SERVER',
settings: settings
});
expect(format(c)).to.not.equal(format(s));
expect(format(c)).to.equal(format(c));
expect(format(s)).to.equal(format(s));
Expand Down
Loading