Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 20 additions & 99 deletions lib/src/message_window.dart
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import 'dart:collection';
import 'dart:io';
import 'dart:typed_data';

import 'package:buffer/buffer.dart';

import 'server_messages.dart';

class MessageFrame {
Expand All @@ -22,106 +23,13 @@ class MessageFrame {
116: () => new ParameterDescriptionMessage()
};

int get bytesAvailable => packets.fold(0, (sum, v) => sum + v.lengthInBytes);
List<Uint8List> packets = [];
bool get hasReadHeader => type != null;
int type;
int expectedLength;

bool get isComplete => data != null || expectedLength == 0;
Uint8List data;

ByteData consumeNextBytes(int length) {
if (length == 0) {
return null;
}

if (bytesAvailable >= length) {
var firstPacket = packets.first;

// The packet exactly matches the size of the bytes needed,
// remove & return it.
if (firstPacket.lengthInBytes == length) {
packets.removeAt(0);
return firstPacket.buffer
.asByteData(firstPacket.offsetInBytes, firstPacket.lengthInBytes);
}

if (firstPacket.lengthInBytes > length) {
// We have to split up this packet and remove & return the first portion of it,
// and replace it with the second portion of it.
var remainingOffset = firstPacket.offsetInBytes + length;
var bytesNeeded =
firstPacket.buffer.asByteData(firstPacket.offsetInBytes, length);
var bytesRemaining = firstPacket.buffer
.asUint8List(remainingOffset, firstPacket.lengthInBytes - length);
packets.removeAt(0);
packets.insert(0, bytesRemaining);

return bytesNeeded;
}

// Otherwise, the first packet can't fill this message, but we know
// we have enough packets overall to fulfill it. So we can build
// a total buffer by accumulating multiple packets into that buffer.
// Each packet gets removed along the way, except for the last one,
// in which case if it has more bytes available, it gets replaced
// with the remaining bytes.

var builder = new BytesBuilder(copy: false);
var bytesNeeded = length - builder.length;
while (bytesNeeded > 0) {
var packet = packets.removeAt(0);
var bytesRemaining = packet.lengthInBytes;

if (bytesRemaining <= bytesNeeded) {
builder.add(packet.buffer
.asUint8List(packet.offsetInBytes, packet.lengthInBytes));
} else {
builder.add(
packet.buffer.asUint8List(packet.offsetInBytes, bytesNeeded));
packets.insert(
0,
packet.buffer
.asUint8List(bytesNeeded, bytesRemaining - bytesNeeded));
}

bytesNeeded = length - builder.length;
}

return new Uint8List.fromList(builder.takeBytes()).buffer.asByteData();
}

return null;
}

int addBytes(Uint8List packet) {
packets.add(packet);

if (!hasReadHeader) {
ByteData headerBuffer = consumeNextBytes(HeaderByteSize);
if (headerBuffer == null) {
return packet.lengthInBytes;
}

type = headerBuffer.getUint8(0);
expectedLength = headerBuffer.getUint32(1) - 4;
}

if (expectedLength == 0) {
return packet.lengthInBytes - bytesAvailable;
}

var body = consumeNextBytes(expectedLength);
if (body == null) {
return packet.lengthInBytes;
}

data = body.buffer.asUint8List(body.offsetInBytes, body.lengthInBytes);

return packet.lengthInBytes - bytesAvailable;
}

ServerMessage get message {
var msgMaker =
messageTypeMap[type] ?? () => new UnknownMessage()..code = type;
Expand All @@ -133,21 +41,34 @@ class MessageFrame {
}

class MessageFramer {
final _reader = new ByteDataReader();
MessageFrame messageInProgress = new MessageFrame();
final messageQueue = new Queue<MessageFrame>();

void addBytes(Uint8List bytes) {
var offsetIntoBytesRead = 0;
_reader.add(bytes);

bool evaluateNextMessage = true;
while (evaluateNextMessage) {
evaluateNextMessage = false;
if (!messageInProgress.hasReadHeader &&
_reader.remainingLength >= MessageFrame.HeaderByteSize) {
messageInProgress.type = _reader.readUint8();
messageInProgress.expectedLength = _reader.readUint32() - 4;
}

do {
var byteList = new Uint8List.view(bytes.buffer, offsetIntoBytesRead);
offsetIntoBytesRead += messageInProgress.addBytes(byteList);
if (messageInProgress.hasReadHeader &&
messageInProgress.expectedLength > 0 &&
_reader.remainingLength >= messageInProgress.expectedLength) {
messageInProgress.data = _reader.read(messageInProgress.expectedLength);
}

if (messageInProgress.isComplete) {
messageQueue.add(messageInProgress);
messageInProgress = new MessageFrame();
evaluateNextMessage = true;
}
} while (offsetIntoBytesRead != bytes.length);
}
}

bool get hasMessage => messageQueue.isNotEmpty;
Expand Down
1 change: 1 addition & 0 deletions pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ environment:
sdk: ">=2.0.0 <3.0.0"

dependencies:
buffer: ^1.0.5
crypto: ^2.0.0

dev_dependencies:
Expand Down