1

I'm currently facing a challenging issue while working on a Dart application that reads messages over the Zenoh protocol. Here's a breakdown of my problem:

  1. My Dart code executes native C code via Dart:ffi. (without issue)
  2. The C code then calllbacks the Dart code with data (without issue for direct callback)

The problem arises when I introduce the Zenoh library in my C code, which creates a "subscriber." This subscriber spawns threads whenever messages are received. These threads are responsible for handling the incoming messages and should callback the contents to my Dart code.

However, when I try to invoke a Dart callback function from these threads, I encounter the following error: error: Cannot invoke native callback outside an isolate.

In an attempt to solve this issue, I decided to use Dart:isolate and utilize send/ReceivePorts to send the data from the C threads. Unfortunately, this approach resulted in a different error: error: Dart_NewSendPort expects there to be a current isolate. Did you forget to call Dart_CreateIsolateGroup or Dart_EnterIsolate?

I'm unsure if I'm approaching the problem correctly, and I'm not entirely sure if I fully understand it. I've dedicated nearly four days straight to resolving this issue, and I'm feeling quite stuck and frustrated.

I would greatly appreciate any insights, suggestions, or guidance on how to overcome this obstacle and successfully invoke Dart callbacks from the Zenoh C threads. Thank you in advance for your assistance!


Dart Code:

import 'dart:ffi';
import 'package:ffi/ffi.dart';
import 'dart:isolate';

// typedef Request Callback 
typedef RequestCallbackC = Void Function(Int32 port);
typedef RequestCallbackDart = void Function(int port);

class CWrapperZenoh{
  late final DynamicLibrary _zenohLib;
  late final RequestCallbackDart _requestCallbackDart;
  // for native c callback
  late final ReceivePort _receivePort;
  late final SendPort _sendPort;

  CWrapperZenoh(){
    _zenohLib = DynamicLibrary.open("bin/include/lib_c_lib.so");
    _requestCallbackDart = _zenohLib.lookup<NativeFunction<RequestCallbackC>> 
      ('request_callback').asFunction();

    _receivePort = ReceivePort();
    _sendPort = _receivePort.sendPort;

    // listen
    _receivePort.listen((message) {
      print("listen message\n");
      if (message is String) {
        dartCallbackString(message);
      } else if (message is int){
        print(message);
      }
    });
  }

// application calls this to initiate
callFunctions(){
  print("sendport: ${_sendPort.nativePort}");
  _requestCallbackDart(_sendPort.nativePort);
}

static void dartCallbackString(String s) {
  print("Received string from C: $s\n");
}

}


C library:

    #include <string.h>
    #include <stdio.h>
    #include <unistd.h>
    #include "zenoh.h"
    #include "include/dart_api.h"
    #include "include/dart_api_dl.h"
    #include <pthread.h>

    // define Dart callback function
    typedef void (*DartFunction)(const char*);

    z_owned_config_t* config_ptr;
    z_owned_session_t* sessionPtr;
    z_owned_subscriber_t* sub;
    z_owned_closure_sample_t callback;
    Dart_Port dart_port;

    DartFunction dartFunction;

    
    void callbackFuncToDart(const z_sample_t *sample, void *arg);

    void request_callback(Dart_Port port){
        dart_port = port;

        // config
        config_ptr = malloc(sizeof(z_owned_config_t));
        if (config_ptr != NULL) {
            *config_ptr = zc_config_from_file("bin/include/DEFAULT_CONFIG_CONNECT.json5");
        }
        printf("config created\n");
        fflush(stdout);


        // open session
        sessionPtr = (z_owned_session_t*)malloc(sizeof(z_owned_session_t));
        *sessionPtr = z_open(config_ptr);
        
        // subscribing
        z_owned_closure_sample_t callback = z_closure(callbackFuncToDart);
        
        sub = malloc(sizeof(z_owned_subscriber_t));
        *sub = z_declare_subscriber(z_loan(*sessionPtr), z_keyexpr("geometry_msgs/msg/actualSpeed"), z_move(callback), NULL);
    }

    // callback to Dart
    void callbackFuncToDart(const z_sample_t *sample, void *arg) {
        
        // Create a SendPort to communicate with Dart isolate
        Dart_Handle sendPort = Dart_NewSendPort(dart_port);

        // Check if creating the SendPort was successful
        if (!Dart_IsError(sendPort)) {
            Dart_CObject message;
            message.type = Dart_CObject_kInt32;
            message.value.as_int32 = 1;

            // Send the message using the SendPort
            if (Dart_PostCObject(dart_port, &message)){
                printf("Message sent\n");
                fflush(stdout);
            } else {
                printf("Failed to create SendPort\n");
                fflush(stdout);
            }   
        }

        //--Removed old irelevant code. comment relevant for my first threaded callback attempt--
        //dartFunction(str);    // different thread id than Darts "Main isolate". therefore i get the following: "error: Cannot invoke native callback outside an isolate."
                                            
    }

1 Answer 1

2

A lot have happened since i posted the question and im not sure what caused this exact issue. nonetheless i made it work with the following changes.

In my C library i expose this function

// Initialize `dart_api_dl.h`
DART_EXPORT intptr_t InitDartApiDL(void* data) {
  return Dart_InitializeApiDL(data);
}

And then i call it in my Dart constructor

ZenohWrapper() {
  _zenohLib = DynamicLibrary.open('dirToLib/lib.so');
  _initializeZenohFunctions();
  final initializeApi = _zenohLib.lookupFunction<IntPtr Function(Pointer<Void>), int Function(Pointer<Void>)>("InitDartApiDL");
}    
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.