0

I am trying to implement a protocol to send and receive files over socket. The protocol is specified and I can not change that.

I am new to NodeJs, and this is how I am trying to implement that.

I will write a duplex stream, and pipe file into it. Then pipe it into socket to send data.

The confusion comes where I should read this, and where to write that. How to know reading file has finished, and how to tell socket that it is finished. Docs are not very clear to me, and googling added more confusion :)

Any help would be appreciated.

P.S. I will add my own samples when I get home, I just don't have it now.

EDIT

After @MattHarrison's answer, I changed code into this:

var stream = require('stream');
var util = require('util');
var bufferpack = require('bufferpack');
var fs = require('fs');
var net = require('net');

var MyProtocolStream = function () {

    this.writtenHeader = false;            // have we written the header yet?
    stream.Transform.call(this);
};

util.inherits(MyProtocolStream, stream.Transform);

MyProtocolStream.prototype._transform = function (chunk, encoding, callback) {

    if (!this.writtenHeader) {
        this.push('==== HEADER ====\n');  // if we've not, send the header first
    }

    // Can this function be interrupted at this very line?
    // Then another _transform comes in and pushes its own data to socket
    // Corrupted data maybe then?
    // How to prevent this behavior? Buffering whole file before sending?

    var self = this;
    // I put a random timeout to simulate overlapped calls
    // Can this happen in real world?
    setTimeout(function () {
        self.push(chunk);  // send the incoming file chunks along as-is
        callback();
    }, Math.random()*10);
};

MyProtocolStream.prototype._flush = function (callback) {

    this.push('==== FOOTER ====\n');      // Just before the stream closes, send footer
    callback();
};

var file = '/tmp/a';

var server = net.createServer(function (sck) {
    sck.addr = sck.remoteAddress;
    console.log('Client connected - ' + sck.addr);
    fs.createReadStream('/tmp/a').pipe(new MyProtocolStream()).pipe(sck);
    fs.createReadStream('/tmp/b').pipe(new MyProtocolStream()).pipe(sck);
    fs.createReadStream('/tmp/c').pipe(new MyProtocolStream()).pipe(sck);
    sck.on('close', function () {
        console.log('Client disconnected - ' + this.addr);
    })
});

server.listen(22333, function () {
    console.log('Server started on ' + 22333)
});

See my comments in _transform.

4
  • The question is too vague at the moment. You should at least try something yourself and then ask a specific question regarding a pain point that we can help with. Commented Oct 5, 2015 at 15:13
  • @MattHarrison You are right. I am home now, and added my sample code. Commented Oct 5, 2015 at 16:42
  • You say you're trying to implement a custom protocol, but it looks like you're just sending a file to a socket while reinventing node's streams. What does your custom protocol actually do? Maybe you should start with that. Commented Oct 5, 2015 at 17:07
  • Yes, but I send a header before sending file. Also consider uploading file to client a task. I may have multiple tasks at once, and to send files on one socket, I have to make sure files go in right order e.g. I don't want files getting scrambled because two tasks send two files, and they were sent in the middle of each other. Somehow, I want to lock socket for a task. Commented Oct 5, 2015 at 17:18

2 Answers 2

4

I'm not sure on the exact details of the protocol that you're trying to implement but the following should give you a good pattern that you can adapt to your needs.

My fictional protocol

When a client socket connects to my TCP server, I want to send them a file. But first I want to send a header. At the end of the file, before the stream ends, I also want to send a header. So the data written to the socket looks like:

==== HEADER ====
[FILE CONTENTS]
==== FOOTER ====

Implementing a Transform stream

All I want to do is transform the data coming out of a readable stream. Notice transform is the keyword here. I can use a Transform stream for this.

When creating a Transform stream, you can override two methods: _transform and _flush. _transform is called with each chunk coming of the readable stream. You can change the data, buffer it up or whatever. _flush is called right after all the data from the readable has finished. You can do anymore cleanup here, or write a last bit of data out.

var Stream = require('stream');
var Util = require('util');

var MyProtocolStream = function () {

    this.writtenHeader = false;            // have we written the header yet?
    Stream.Transform.call(this);
};

Util.inherits(MyProtocolStream, Stream.Transform);

MyProtocolStream.prototype._transform = function (chunk, encoding, callback) {

    if (!this.writtenHeader) {
        this.push('==== HEADER ====\n');  // if we've not, send the header first
        this.writtenHeader = true;
    }
    this.push(chunk);                     // send the incoming file chunks along as-is
    callback();
};

MyProtocolStream.prototype._flush = function (callback) {

    this.push('==== FOOTER ====\n');      // Just before the stream closes, send footer 
    callback();
};

Using the MyProtocolStream

So now I have a stream that does what I want, I can simply pipe a file (or any readable stream for that matter) through my custom Transform stream and into any other Writable stream (such as a socket).

var Fs = require('fs');
var Net = require('net');

var server = Net.createServer(function (socket) {

    Fs.createReadStream('./example.txt')
        .pipe(new MyProtocolStream())
        .pipe(socket);
});

server.listen(8000);

Testing it out

I can test this out by adding some contents to example.txt:

This is a line
This is another line
This is the last line

I can spin up my server and then connect with telnet/nc:

$ telnet 127.0.0.1 8000
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
==== HEADER ====
This is a line
This is another line
This is the last line
==== FOOTER ====
Connection closed by foreign host.

So what about Duplex streams?

A duplex stream is two streams embedded within one. Data comes out of one and you write totally different data into the other one. It's used where there's 2-way communication with another entity (such as a TCP socket). In this example, we don't need a Duplex stream because data is only flowing in one direction:

file -> MyProtocolStream -> socket

Learning more

As Meir pointed out in the other answer, Substack's stream handbook is the canonical (and best) resource for streams along with the official docs. If you read them thoroughly and implement the examples yourself, you'll learn all you need to know about streams.

Sending multiple files over a single socket in series

If you're wanting to write the output of multiple of these Transforms streams into a single writable end, pipe() isn't going to work for you. Once the EOF comes from a single stream, the upstream writable (socket) is also going to be closed. Also there's no guarantee about ordering of data events in this case. So you need to manually aggregate the streams by listening to data/end events, starting to read one stream after another has finished:

var server = Net.createServer(function (socket) {

    var incoming1 = Fs.createReadStream('./example.txt')
            .pipe(new MyProtocolStream());

    var incoming2 = Fs.createReadStream('./example.txt')
            .pipe(new MyProtocolStream());

    var readStream = function (stream, callback) {

        stream.on('data', socket.write.bind(socket));
        stream.on('end', callback);
    };

    readStream(incoming1, function () {

        readStream(incoming2, function () {

            socket.end();
        });
    });
});

If you're nested callback-averse, you could also use promises:

var server = Net.createServer(function (socket) {

    var incoming1 = Fs.createReadStream('./example.txt')
            .pipe(new MyProtocolStream());

    var incoming2 = Fs.createReadStream('./example.txt')
            .pipe(new MyProtocolStream());

    var incoming3 = Fs.createReadStream('./example.txt')
            .pipe(new MyProtocolStream());

    var readStream = function (stream) {
        return new Promise(function (resolve, reject) {
            stream.on('data', socket.write.bind(socket));
            stream.on('end', resolve);
        });
    };

    readStream(incoming1)
    .then(function () {
        return readStream(incoming2);
    })
    .then(function () {
        return readStream(incoming3);
    })
    .then(function () {
        socket.end();
    });
});
Sign up to request clarification or add additional context in comments.

4 Comments

Thanks, it is indeed working fine. Just some questions to understand Node.js better. I copied the code in server callback (FS.cre...) three times there, just to know what happens when more than one stream is piped into socket. They seem not to go in order, but they do not overlap each other. Is this reliable, or overlap could happen? I mean, getting two ==== HEADER ==== repeatedly. Also _flush is called only once? When all streams have finished? I thought it would be called when each of MyProtocolStreams is finished.
@thelastblack what are you trying to achieve by piping multiple readers? Do you want files to be multiplexed across the stream in parallel? Or one after the other in series?
I am trying to send files to clients. Send task is triggered by another service, and executed by this service. Suppose I have a client. Two send task is created for this client, and both are executed (using setTimeout and a delay of 100ms between them). Using Transform stream, I want to pipe files to client (with headers), but I'm also worried about concurrency issues as I have little experience in Node.js. How should I queue files to be sent to client, in series, when my socket and stream and pipes are async? Note that tasks may come in the middle of another task.
Thanks, I got the idea. Sorry if my questions seemed noobish :)
0

This is a great resource on streams.

As for socket usage, when you send/get socket message, you send the type and a payload. So, your general socket stream could be to send message of type 'file_data' with the content while you have data to send and at the end send a message of type 'eof' (for end of file) with an empty payload.

4 Comments

I have read that before, but not so obvious. It is much like official docs nodejs.org/api/stream.html
Did you try to follow the examples? I implemented them. They work and give a pretty good idea of how to use streams and create custom ones. Also, have a look at through and through2, it might simplify things for you.
Or a socket based stream, for example: npmjs.com/package/socket.io-stream

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.