Skip to content

Commit e103a18

Browse files
committed
Use package:buffer to parse incoming message frames.
1 parent 8355255 commit e103a18

File tree

2 files changed

+21
-99
lines changed

2 files changed

+21
-99
lines changed

lib/src/message_window.dart

Lines changed: 20 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import 'dart:collection';
2-
import 'dart:io';
32
import 'dart:typed_data';
43

4+
import 'package:buffer/buffer.dart';
5+
56
import 'server_messages.dart';
67

78
class MessageFrame {
@@ -22,106 +23,13 @@ class MessageFrame {
2223
116: () => new ParameterDescriptionMessage()
2324
};
2425

25-
int get bytesAvailable => packets.fold(0, (sum, v) => sum + v.lengthInBytes);
26-
List<Uint8List> packets = [];
2726
bool get hasReadHeader => type != null;
2827
int type;
2928
int expectedLength;
3029

3130
bool get isComplete => data != null || expectedLength == 0;
3231
Uint8List data;
3332

34-
ByteData consumeNextBytes(int length) {
35-
if (length == 0) {
36-
return null;
37-
}
38-
39-
if (bytesAvailable >= length) {
40-
var firstPacket = packets.first;
41-
42-
// The packet exactly matches the size of the bytes needed,
43-
// remove & return it.
44-
if (firstPacket.lengthInBytes == length) {
45-
packets.removeAt(0);
46-
return firstPacket.buffer
47-
.asByteData(firstPacket.offsetInBytes, firstPacket.lengthInBytes);
48-
}
49-
50-
if (firstPacket.lengthInBytes > length) {
51-
// We have to split up this packet and remove & return the first portion of it,
52-
// and replace it with the second portion of it.
53-
var remainingOffset = firstPacket.offsetInBytes + length;
54-
var bytesNeeded =
55-
firstPacket.buffer.asByteData(firstPacket.offsetInBytes, length);
56-
var bytesRemaining = firstPacket.buffer
57-
.asUint8List(remainingOffset, firstPacket.lengthInBytes - length);
58-
packets.removeAt(0);
59-
packets.insert(0, bytesRemaining);
60-
61-
return bytesNeeded;
62-
}
63-
64-
// Otherwise, the first packet can't fill this message, but we know
65-
// we have enough packets overall to fulfill it. So we can build
66-
// a total buffer by accumulating multiple packets into that buffer.
67-
// Each packet gets removed along the way, except for the last one,
68-
// in which case if it has more bytes available, it gets replaced
69-
// with the remaining bytes.
70-
71-
var builder = new BytesBuilder(copy: false);
72-
var bytesNeeded = length - builder.length;
73-
while (bytesNeeded > 0) {
74-
var packet = packets.removeAt(0);
75-
var bytesRemaining = packet.lengthInBytes;
76-
77-
if (bytesRemaining <= bytesNeeded) {
78-
builder.add(packet.buffer
79-
.asUint8List(packet.offsetInBytes, packet.lengthInBytes));
80-
} else {
81-
builder.add(
82-
packet.buffer.asUint8List(packet.offsetInBytes, bytesNeeded));
83-
packets.insert(
84-
0,
85-
packet.buffer
86-
.asUint8List(bytesNeeded, bytesRemaining - bytesNeeded));
87-
}
88-
89-
bytesNeeded = length - builder.length;
90-
}
91-
92-
return new Uint8List.fromList(builder.takeBytes()).buffer.asByteData();
93-
}
94-
95-
return null;
96-
}
97-
98-
int addBytes(Uint8List packet) {
99-
packets.add(packet);
100-
101-
if (!hasReadHeader) {
102-
ByteData headerBuffer = consumeNextBytes(HeaderByteSize);
103-
if (headerBuffer == null) {
104-
return packet.lengthInBytes;
105-
}
106-
107-
type = headerBuffer.getUint8(0);
108-
expectedLength = headerBuffer.getUint32(1) - 4;
109-
}
110-
111-
if (expectedLength == 0) {
112-
return packet.lengthInBytes - bytesAvailable;
113-
}
114-
115-
var body = consumeNextBytes(expectedLength);
116-
if (body == null) {
117-
return packet.lengthInBytes;
118-
}
119-
120-
data = body.buffer.asUint8List(body.offsetInBytes, body.lengthInBytes);
121-
122-
return packet.lengthInBytes - bytesAvailable;
123-
}
124-
12533
ServerMessage get message {
12634
var msgMaker =
12735
messageTypeMap[type] ?? () => new UnknownMessage()..code = type;
@@ -133,21 +41,34 @@ class MessageFrame {
13341
}
13442

13543
class MessageFramer {
44+
final _reader = new ByteDataReader();
13645
MessageFrame messageInProgress = new MessageFrame();
13746
final messageQueue = new Queue<MessageFrame>();
13847

13948
void addBytes(Uint8List bytes) {
140-
var offsetIntoBytesRead = 0;
49+
_reader.add(bytes);
50+
51+
bool evaluateNextMessage = true;
52+
while (evaluateNextMessage) {
53+
evaluateNextMessage = false;
54+
if (!messageInProgress.hasReadHeader &&
55+
_reader.remainingLength >= MessageFrame.HeaderByteSize) {
56+
messageInProgress.type = _reader.readUint8();
57+
messageInProgress.expectedLength = _reader.readUint32() - 4;
58+
}
14159

142-
do {
143-
var byteList = new Uint8List.view(bytes.buffer, offsetIntoBytesRead);
144-
offsetIntoBytesRead += messageInProgress.addBytes(byteList);
60+
if (messageInProgress.hasReadHeader &&
61+
messageInProgress.expectedLength > 0 &&
62+
_reader.remainingLength >= messageInProgress.expectedLength) {
63+
messageInProgress.data = _reader.read(messageInProgress.expectedLength);
64+
}
14565

14666
if (messageInProgress.isComplete) {
14767
messageQueue.add(messageInProgress);
14868
messageInProgress = new MessageFrame();
69+
evaluateNextMessage = true;
14970
}
150-
} while (offsetIntoBytesRead != bytes.length);
71+
}
15172
}
15273

15374
bool get hasMessage => messageQueue.isNotEmpty;

pubspec.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ environment:
99
sdk: ">=2.0.0 <3.0.0"
1010

1111
dependencies:
12+
buffer: ^1.0.5
1213
crypto: ^2.0.0
1314

1415
dev_dependencies:

0 commit comments

Comments
 (0)