diff --git a/.github/workflows/arduino-lint.yml b/.github/workflows/arduino-lint.yml index a85a4b5..a897cdb 100644 --- a/.github/workflows/arduino-lint.yml +++ b/.github/workflows/arduino-lint.yml @@ -24,6 +24,6 @@ jobs: uses: arduino/arduino-lint-action@v2 with: compliance: strict - library-manager: submit # remember to change to 'update' after the library is published on the libraries index + library-manager: update # remember to change to 'update' after the library is published on the libraries index # Always use this setting for official repositories. Remove for 3rd party projects. official: true \ No newline at end of file diff --git a/.github/workflows/compile-examples.yml b/.github/workflows/compile-examples.yml index da9e4fe..b0443ba 100644 --- a/.github/workflows/compile-examples.yml +++ b/.github/workflows/compile-examples.yml @@ -61,6 +61,8 @@ jobs: artifact-name-suffix: arduino-renesas_portenta-portenta_c33 - fqbn: arduino:renesas_uno:unor4wifi artifact-name-suffix: arduino-renesas_uno-unor4wifi + - fqbn: arduino:zephyr:unoq + artifact-name-suffix: arduino-zephyr-unoq steps: - name: Checkout diff --git a/examples/decoder_tests/decoder_tester.h b/examples/decoder_tests/decoder_tester.h index 266064f..61c00b0 100644 --- a/examples/decoder_tests/decoder_tester.h +++ b/examples/decoder_tests/decoder_tester.h @@ -21,10 +21,40 @@ class DecoderTester { DecoderTester(RpcDecoder<>& _d): decoder(_d){} + void first_response_info() { + if (!decoder.response_queued()) { + Serial.println("No response queued"); + return; + } + Serial.println("-- First response info --"); + Serial.print("RESP OFFSET: "); + Serial.println(static_cast(decoder._response_offset)); + Serial.print("RESP SIZE: "); + Serial.println(static_cast(decoder._response_size)); + } + + size_t get_response_size() { + return decoder._response_size; + } + + size_t get_response_offset() { + return decoder._response_offset; + } + + template + bool get_response(const uint32_t msg_id, RType& result, RpcError& error) { + return decoder.get_response(msg_id, result, error); + } + void crop_bytes(size_t size, size_t offset){ decoder.consume(size, offset); } + void pop_first() { + uint8_t temp_buffer[512]; + decoder.pop_packet(temp_buffer, 512); + } + void print_raw_buf(){ Serial.print("Decoder raw buffer content: "); diff --git a/examples/decoder_tests/decoder_tests.ino b/examples/decoder_tests/decoder_tests.ino index b823bf9..46e92c0 100644 --- a/examples/decoder_tests/decoder_tests.ino +++ b/examples/decoder_tests/decoder_tests.ino @@ -48,7 +48,7 @@ void runDecoderTest(const char* label) { Serial.println("-- Done --\n"); } -void runDecoderConsumeTest(const char* label, size_t second_packet_sz) { +void runDecoderConsumeTest(const char* label, size_t expected_2nd_pack_size) { Serial.println(label); print_buf(); @@ -63,20 +63,184 @@ void runDecoderConsumeTest(const char* label, size_t second_packet_sz) { delay(50); } + dt.first_response_info(); + + while (!decoder.response_queued()) { + Serial.println("1st response not ready"); + decoder.decode(); + delay(50); + } + size_t pack_size = decoder.get_packet_size(); Serial.print("1st Packet size: "); Serial.println(pack_size); + dt.first_response_info(); + + if ((dt.get_response_offset()!=pack_size)||(dt.get_response_size()!=expected_2nd_pack_size)) { + Serial.println("ERROR parsing 1st response\n"); + return; + } + + Serial.print("Consuming 2nd packet of given size: "); + Serial.println(dt.get_response_size()); + + dt.crop_bytes(dt.get_response_size(), dt.get_response_offset()); + + dt.print_raw_buf(); + + Serial.println("-- Done --\n"); +} + +void runDecoderPopFirstTest(const char* label, size_t expected_2nd_pack_size) { + Serial.println(label); + + print_buf(); + DummyTransport dummy_transport(packer.data(), packer.size()); + RpcDecoder<> decoder(dummy_transport); + + DecoderTester dt(decoder); + + while (!decoder.packet_incoming()) { + Serial.println("Packet not ready"); + decoder.decode(); + delay(50); + } + + while (!decoder.response_queued()) { + Serial.println("1st response not ready"); + decoder.decode(); + delay(50); + } + + dt.first_response_info(); + + size_t pack_size = decoder.get_packet_size(); + Serial.print("Consuming 1st Packet of size: "); + Serial.println(pack_size); + dt.pop_first(); + dt.print_raw_buf(); + + dt.first_response_info(); + + if ((dt.get_response_offset()!=0)||(dt.get_response_size()!=expected_2nd_pack_size)) { + Serial.println("ERROR moving 1st response\n"); + return; + } + Serial.print("Consuming 2nd packet of given size: "); - Serial.println(second_packet_sz); + Serial.println(dt.get_response_size()); + + dt.crop_bytes(dt.get_response_size(), dt.get_response_offset()); + + dt.print_raw_buf(); + dt.first_response_info(); + + Serial.println("-- Done --\n"); +} + +void runDecoderGetResponseTest(const char* label, size_t expected_2nd_pack_size, int _id) { + Serial.println(label); + + print_buf(); + DummyTransport dummy_transport(packer.data(), packer.size()); + RpcDecoder<> decoder(dummy_transport); + + DecoderTester dt(decoder); + + while (!decoder.packet_incoming()) { + Serial.println("Packet not ready"); + decoder.decode(); + delay(50); + } + + dt.first_response_info(); + + while (!decoder.response_queued()) { + Serial.println("1st response not ready"); + decoder.decode(); + delay(50); + } + + size_t pack_size = decoder.get_packet_size(); + Serial.print("1st Packet size: "); + Serial.println(pack_size); + + dt.first_response_info(); + + if ((dt.get_response_offset()!=pack_size)||(dt.get_response_size()!=expected_2nd_pack_size)) { + Serial.println("ERROR parsing 1st response\n"); + return; + } + + Serial.print("Getting response (2nd packet) size: "); + Serial.println(dt.get_response_size()); + + int res; + RpcError _err; + dt.get_response(_id, res, _err); + + Serial.print("Result: "); + Serial.println(res); + + dt.print_raw_buf(); + + Serial.println("-- Done --\n"); +} + + +void runDecoderGetTopResponseTest(const char* label, size_t expected_size, int _id) { + Serial.println(label); + + print_buf(); + DummyTransport dummy_transport(packer.data(), packer.size()); + RpcDecoder<> decoder(dummy_transport); + + DecoderTester dt(decoder); + + while (!decoder.packet_incoming()) { + Serial.println("Packet not ready"); + decoder.decode(); + delay(50); + } - dt.crop_bytes(second_packet_sz, pack_size); + dt.first_response_info(); + + while (!decoder.response_queued()) { + Serial.println("1st response not ready"); + decoder.decode(); + delay(50); + } + + size_t pack_size = decoder.get_packet_size(); + Serial.print("1st Packet size: "); + Serial.println(pack_size); + + dt.first_response_info(); + + if ((dt.get_response_offset()!=0)||(dt.get_response_size()!=expected_size)) { + Serial.println("ERROR parsing 1st response\n"); + return; + } + + Serial.print("Getting response size: "); + Serial.println(dt.get_response_size()); + + int res; + RpcError _err; + dt.get_response(_id, res, _err); + + Serial.print("Result: "); + Serial.println(res); dt.print_raw_buf(); + dt.first_response_info(); + Serial.println("-- Done --\n"); } + void testNestedArrayRequest() { packer.clear(); MsgPack::arr_size_t outer_arr(3); @@ -166,6 +330,63 @@ void testMultipleRpcPackets() { } +// Multiple RPCs in one buffer. Pop the 1st request and then the 2nd response +void testPopRpcPackets() { + packer.clear(); + MsgPack::arr_size_t req_sz(4); + MsgPack::arr_size_t par_sz(2); + MsgPack::arr_size_t resp_sz(4); + MsgPack::object::nil_t nil; + + // 1st request + packer.serialize(req_sz, 0, 1, "sum", par_sz, 10, 20); + // 2nd response + packer.serialize(resp_sz, 1, 1, nil, 42); + // 3rd request + packer.serialize(req_sz, 0, 2, "echo", par_sz, "Hello", true); + + runDecoderPopFirstTest("== Test: Pop-first packet ==", 5); + +} + +// Multiple RPCs in one buffer. Get the response in the buffer +void testGetResponsePacket() { + packer.clear(); + MsgPack::arr_size_t req_sz(4); + MsgPack::arr_size_t par_sz(2); + MsgPack::arr_size_t resp_sz(4); + MsgPack::object::nil_t nil; + + // 1st request + packer.serialize(req_sz, 0, 1, "sum", par_sz, 10, 20); + // 2nd response + packer.serialize(resp_sz, 1, 1, nil, 101); + // 3rd request + packer.serialize(req_sz, 0, 2, "echo", par_sz, "Hello", true); + + runDecoderGetResponseTest("== Test: Get response packet ==", 5, 1); + +} + +// Multiple RPCs in one buffer. The response is top of the buffer +void testGetTopResponsePacket() { + packer.clear(); + MsgPack::arr_size_t req_sz(4); + MsgPack::arr_size_t par_sz(2); + MsgPack::arr_size_t resp_sz(4); + MsgPack::object::nil_t nil; + + // 1st response + packer.serialize(resp_sz, 1, 1, nil, 101); + // 2nd request + packer.serialize(req_sz, 0, 2, "echo", par_sz, "Hello", true); + // 3rd request + packer.serialize(req_sz, 0, 1, "sum", par_sz, 30, 30); + + runDecoderGetTopResponseTest("== Test: Get top response packet ==", 5, 1); + +} + // Binary parameter (e.g., binary blob) void testBinaryParam() { packer.clear(); @@ -225,6 +446,9 @@ void setup() { testDeepNestedStructure(); testArrayOfMapsResponse(); testMultipleRpcPackets(); + testPopRpcPackets(); + testGetResponsePacket(); + testGetTopResponsePacket(); testBinaryParam(); testExtensionParam(); testCombinedComplexBuffer(); diff --git a/library.json b/library.json index 1fa6098..9666cda 100644 --- a/library.json +++ b/library.json @@ -4,14 +4,14 @@ "description": "A MessagePack RPC library for Arduino", "repository": { "type": "git", - "url": "https://github.com/bcmi-labs/Arduino_RPClite" + "url": "https://github.com/arduino-libraries/Arduino_RPCLite" }, "authors": { "name": "Lucio Rossi", "url": "https://github.com/eigen-value", "maintainer": true }, - "version": "0.1.3", + "version": "0.2.1", "license": "MPL2.0", "frameworks": "arduino", "platforms": "*", diff --git a/library.properties b/library.properties index 8ebdfb6..de0055a 100644 --- a/library.properties +++ b/library.properties @@ -1,10 +1,10 @@ name=Arduino_RPClite -version=0.1.3 +version=0.2.1 author=Arduino, Lucio Rossi (eigen-value) maintainer=Arduino, Lucio Rossi (eigen-value) sentence=A MessagePack RPC library for Arduino paragraph=allows to create a client/server architecture using MessagePack as the serialization format. It follows the MessagePack-RPC protocol specification. It is designed to be lightweight and easy to use, making it suitable for embedded systems and IoT applications. category=Communication -url=https://www.arduino.cc/ +url=https://github.com/arduino-libraries/Arduino_RPCLite architectures=* depends=MsgPack (>=0.4.2) diff --git a/src/client.h b/src/client.h index 8d95748..04f26da 100644 --- a/src/client.h +++ b/src/client.h @@ -41,7 +41,8 @@ class RPCClient { } // blocking call - while (!get_response(msg_id_wait, result)){ + RpcError tmp_error; + while (!get_response(msg_id_wait, result, tmp_error)) { //delay(1); } @@ -60,18 +61,18 @@ class RPCClient { } template - bool get_response(const uint32_t wait_id, RType& result) { - RpcError tmp_error; + bool get_response(const uint32_t wait_id, RType& result, RpcError& error) { decoder->decode(); - if (decoder->get_response(wait_id, result, tmp_error)) { - lastError.code = tmp_error.code; - lastError.traceback = tmp_error.traceback; + if (decoder->get_response(wait_id, result, error)) { + lastError.copy(error); return true; } return false; } + uint32_t get_discarded_packets() const {return decoder->get_discarded_packets();} + }; #endif //RPCLITE_CLIENT_H diff --git a/src/decoder.h b/src/decoder.h index f3ab3d3..d0716ca 100644 --- a/src/decoder.h +++ b/src/decoder.h @@ -6,20 +6,22 @@ This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. - */ #ifndef RPCLITE_DECODER_H #define RPCLITE_DECODER_H +// MsgPack log level +#define DEBUGLOG_DEFAULT_LOG_LEVEL_WARN + #include "MsgPack.h" #include "transport.h" #include "rpclite_utils.h" +#include "error.h" using namespace RpcUtils::detail; #define MIN_RPC_BYTES 4 -#define CHUNK_SIZE 32 template class RpcDecoder { @@ -57,34 +59,68 @@ class RpcDecoder { template bool get_response(const uint32_t msg_id, RType& result, RpcError& error) { - if (!packet_incoming() || _packet_type!=RESP_MSG) return false; + if (!response_queued()) return false; MsgPack::Unpacker unpacker; unpacker.clear(); - size_t res_size = get_packet_size(); - if (!unpacker.feed(_raw_buffer, res_size)) return false; + if (!unpacker.feed(_raw_buffer + _response_offset, _response_size)) return false; MsgPack::arr_size_t resp_size; int resp_type; uint32_t resp_id; if (!unpacker.deserialize(resp_size, resp_type, resp_id)) return false; - if (resp_size.size() != RESPONSE_SIZE) return false; - if (resp_type != RESP_MSG) return false; + + // ReSharper disable once CppDFAUnreachableCode if (resp_id != msg_id) return false; + // msg_id OK packet will be consumed. + if (resp_type != RESP_MSG) { + // This should never happen + error.code = PARSING_ERR; + error.traceback = "Unexpected response type"; + consume(_response_size, _response_offset); + if (_response_offset == 0) reset_packet(); + _discarded_packets++; + return true; + } + + if (resp_size.size() != RESPONSE_SIZE) { + // This should never happen + error.code = PARSING_ERR; + error.traceback = "Unexpected RPC response size"; + consume(_response_size, _response_offset); + if (_response_offset == 0) reset_packet(); + _discarded_packets++; + return true; + } + MsgPack::object::nil_t nil; if (unpacker.unpackable(nil)){ // No error - if (!unpacker.deserialize(nil, result)) return false; + if (!unpacker.deserialize(nil, result)) { + error.code = PARSING_ERR; + error.traceback = "Result not parsable (check type)"; + consume(_response_size, _response_offset); + if (_response_offset == 0) reset_packet(); + _discarded_packets++; + return true; + } } else { // RPC returned an error - if (!unpacker.deserialize(error, nil)) return false; + if (!unpacker.deserialize(error, nil)) { + error.code = PARSING_ERR; + error.traceback = "RPC Error not parsable (check type)"; + consume(_response_size, _response_offset); + if (_response_offset == 0) reset_packet(); + _discarded_packets++; + return true; + } } - reset_packet(); - consume(res_size); - return true; + if (_response_offset == 0) reset_packet(); + consume(_response_size, _response_offset); + return true; } bool send_response(const MsgPack::Packer& packer) const { @@ -103,8 +139,7 @@ class RpcDecoder { unpacker.clear(); if (!unpacker.feed(_raw_buffer, _packet_size)) { // feed should not fail at this point - consume(_packet_size); - reset_packet(); + discard(); return ""; }; @@ -113,27 +148,24 @@ class RpcDecoder { MsgPack::arr_size_t req_size; if (!unpacker.deserialize(req_size, msg_type)) { - consume(_packet_size); - reset_packet(); + discard(); return ""; // Header not unpackable } + // ReSharper disable once CppDFAUnreachableCode if (msg_type == CALL_MSG && req_size.size() == REQUEST_SIZE) { uint32_t msg_id; if (!unpacker.deserialize(msg_id, method)) { - consume(_packet_size); - reset_packet(); + discard(); return ""; // Method not unpackable } } else if (msg_type == NOTIFY_MSG && req_size.size() == NOTIFY_SIZE) { if (!unpacker.deserialize(method)) { - consume(_packet_size); - reset_packet(); + discard(); return ""; // Method not unpackable } } else { - consume(_packet_size); - reset_packet(); + discard(); return ""; // Invalid request size/type } @@ -167,35 +199,50 @@ class RpcDecoder { void parse_packet(){ - if (packet_incoming()){return;} + size_t offset = 0; + + if (packet_incoming()) { + if (response_queued()) return; // parsing complete + offset = _response_offset; // looking for a RESP + } size_t bytes_checked = 0; size_t container_size; - int type; + int type = NO_MSG; MsgPack::Unpacker unpacker; - while (bytes_checked < _bytes_stored){ + while (bytes_checked + offset < _bytes_stored){ bytes_checked++; unpacker.clear(); - if (!unpacker.feed(_raw_buffer, bytes_checked)) continue; + if (!unpacker.feed(_raw_buffer + offset, bytes_checked)) continue; if (unpackTypedArray(unpacker, container_size, type)) { if (type != CALL_MSG && type != RESP_MSG && type != NOTIFY_MSG) { - consume(bytes_checked); + consume(bytes_checked, offset); + _discarded_packets++; break; // Not a valid RPC type (could be type=WRONG_MSG) } if ((type == CALL_MSG && container_size != REQUEST_SIZE) || (type == RESP_MSG && container_size != RESPONSE_SIZE) || (type == NOTIFY_MSG && container_size != NOTIFY_SIZE)) { - consume(bytes_checked); + consume(bytes_checked, offset); + _discarded_packets++; break; // Not a valid RPC format } - _packet_type = type; - _packet_size = bytes_checked; + if (offset == 0) { + _packet_type = type; + _packet_size = bytes_checked; + } + + if (type == RESP_MSG) { + _response_offset = offset; + _response_size = bytes_checked; // response queued + } else { + _response_offset = offset + bytes_checked; + } + break; - } else { - continue; } } @@ -204,12 +251,18 @@ class RpcDecoder { bool packet_incoming() const { return _packet_size >= MIN_RPC_BYTES; } + bool response_queued() const { + return _response_size > 0; + } + int packet_type() const { return _packet_type; } size_t get_packet_size() const { return _packet_size;} size_t size() const {return _bytes_stored;} + uint32_t get_discarded_packets() const {return _discarded_packets;} + friend class DecoderTester; private: @@ -218,7 +271,10 @@ class RpcDecoder { size_t _bytes_stored = 0; int _packet_type = NO_MSG; size_t _packet_size = 0; + size_t _response_offset = 0; + size_t _response_size = 0; uint32_t _msg_id = 0; + uint32_t _discarded_packets = 0; bool buffer_full() const { return _bytes_stored == BufferSize; } @@ -252,25 +308,41 @@ class RpcDecoder { return consume(packet_size); } + void discard() { + consume(_packet_size); + reset_packet(); + _discarded_packets++; + } void reset_packet() { _packet_type = NO_MSG; _packet_size = 0; } -size_t consume(size_t size, size_t offset = 0) { - // Boundary checks - if (offset + size > _bytes_stored || size == 0) return 0; - - size_t remaining_bytes = _bytes_stored - size; - for (size_t i=offset; i _bytes_stored || size == 0) return 0; + + size_t remaining_bytes = _bytes_stored - size; + for (size_t i=offset; i= offset + size) { + _response_offset -= size; + } else { + reset_response(); + } + + _bytes_stored = remaining_bytes; + return size; + } }; -#endif \ No newline at end of file +#endif diff --git a/src/error.h b/src/error.h index 925b30a..d177c52 100644 --- a/src/error.h +++ b/src/error.h @@ -14,9 +14,13 @@ #include +// MsgPack log level +#define DEBUGLOG_DEFAULT_LOG_LEVEL_WARN + #include "MsgPack.h" #define NO_ERR 0x00 +#define PARSING_ERR 0xFC #define MALFORMED_CALL_ERR 0xFD #define FUNCTION_NOT_FOUND_ERR 0xFE #define GENERIC_ERR 0xFF @@ -34,7 +38,12 @@ struct RpcError { RpcError(const int c, MsgPack::str_t tb) : code(c), traceback(std::move(tb)) {} + void copy(const RpcError& err) { + code = err.code; + traceback = err.traceback; + } + MSGPACK_DEFINE(code, traceback); // -> [code, traceback] }; -#endif \ No newline at end of file +#endif diff --git a/src/server.h b/src/server.h index 3200b40..f3a5803 100644 --- a/src/server.h +++ b/src/server.h @@ -87,6 +87,8 @@ class RPCServer { } + uint32_t get_discarded_packets() const {return decoder->get_discarded_packets();} + private: RpcDecoder<>* decoder = nullptr; RpcFunctionDispatcher dispatcher{};