2

I'm a new in Kotlin's coroutines.

Here code with classic Thread:

 import com.google.gson.JsonElement
    import com.google.gson.JsonObject
    import com.google.gson.JsonParser
    import com.zaxxer.hikari.HikariConfig
    import com.zaxxer.hikari.HikariDataSource
    import okhttp3.*
    import okio.ByteString
    import org.slf4j.LoggerFactory
    import java.util.concurrent.atomic.AtomicInteger

    object BithumbSocketListener : WebSocketListener() {

        override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
            super.onFailure(webSocket, t, response)
            Thread {
                оkHttpClient.newWebSocket(wsRequest, BithumbSocketListener)
            }.start()
        }

        override fun onMessage(webSocket: WebSocket, text: String) {
            super.onMessage(webSocket, text)
            logger.debug("ws_onMessage: text = $text")
        }

    }

    fun main(args: Array<String>) {
        currenciesList = currencies.split(",")
        currenciesList.forEach {
            OkHttpClient().newWebSocket(wsRequest, BithumbSocketListener)
        }
    } 

As you can see I have list of currencies (currenciesList). I iterate it and call newWebSocket for every item of list. As you can see BithumbSocketListener is a singleton.

If has some problem with web socket then call callback method onFailure and I create new web socket in separate java thread:

        Thread {
            оkHttpClient.newWebSocket(wsRequest, BithumbSocketListener)
}.start()

Nice. It's work fine. But I want replace this code by Kotlin coroutines. How I can do this?

Thanks.

0

1 Answer 1

1

Since you're processing an async stream of messages, you should port it to coroutines by implementing an actor, such as

val wsActor: SendChannel<String> = actor {
    for (msg in channel) {
        logger.info("Another message is in: ${msg}")
    }
}

From the type of wsActor you can see you're supposed to send messages to it. This is where the bridging code comes in:

class BithumbSocketListener(
    private val chan: Channel<String>
) : WebSocketListener() {
    override fun onMessage(webSocket: WebSocket, text: String) {
        chan.send(text)
    }

    override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
        оkHttpClient.newWebSocket(wsRequest, this)
    }
}

Note that, compared to your code, I don't start any new threads for retrying. This has nothing to do with porting to coroutines, your code doesn't need it either. newWebSocket is an async call that returns immediately.

Finally, start the websockets for each currency:

currenciesList.forEach {
    OkHttpClient().newWebSocket(wsRequest, BithumbSocketListener(wsActor)
}
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.