Количество просмотров288
13 сентября 2021

Kotlin Native. Работаем с потоками с корутинами и без. Ч2

Всем доброго дня! С вами Анна Жаркова, ведущий мобильный разработчик компании Usetech. Продолжаем рассматривать способы многопоточный работы в Kotlin Native.

Ранее мы рассматривали некоторые нюансы работы с корутинами, как работать с Worker и AtomicReference.

Еще одним возможным API для работы с многопоточностью является DetachedObjectGraph.

//код Supranatural concurrency

override fun <T, V> execute(jobInput: T, job: (T) -> V): Future<V> {
        val deferred = DeferredFuture<V>()
        val detached = DetachedObjectGraph { Triple(jobInput, job, deferred).toImmutable() }.asCPointer()

        dispatch_async_f(dispatch_get_main_queue(), detached, staticCFunction { it: COpaquePointer? ->
            initRuntimeIfNeeded()
            val attached = DetachedObjectGraph<Triple<T, (T) -> V, DeferredFuture<V>>>(it).attach()
            val result = attached.second(attached.first)
            attached.third.setValue(result)
        })
        return deferred
    }

Чем оно примечательно, так тем, что это способ передавать объекты между потоками без заморозки. За счет чего это достигается:

1. Входные параметры, ссылки на блоки и коллбэки, которые надо вызвать и передать в другом потоке, кладутся в качестве общего содержимого вовнутрь DetachedObjectGraph.

2. Затем получаем указатель на открепленный подграф с нашим объектом с помощью asCPointer. И уже внутри нужного потока вызываем staticCFunction, где в качестве параметра работаем с нашим указателем.

3. Для того, чтобы извлечь из графа параметры, надо его прикрепить с помощью команды attach, извлечь упакованные данные и преобразовать нужным способом.

4. А вот вернуть коллбэк нужно вне этого потока.

Важно! Attach открепленного объекта можно вызвать один раз. Иначе можно получить исключение IllegalStateException: Illegal transfer state. Поэтому ссылка на граф должна зануляться после окончания работы блока.
Также при упаковке объекта полезно указать TransferMode.SAFE. Но в API DetachedObjectGraph этот параметр используется по умолчанию.

Для удобства можно сделать обертку, которая позволит работать с изменяемыми элементами в разных потоках, соединив и worker, и AtomicReference, и DetachedGraphObject:

class SharedDetachedObject<T:Any>(producer: () -> T) {
    private val adog : AtomicReference<DetachedObjectGraph<Any>?>
    private val lock = Any()

    init {
        val detachedObjectGraph = DetachedObjectGraph { producer() as Any }.freeze()
        adog = AtomicReference(detachedObjectGraph.freeze())
    }

    fun <R> access(block: (T) -> R): R = trySynchronized(lock){
        val holder = FreezableAtomicReference<Any?>(null)
        val producer = { grabAccess(holder, block) as Any }
        adog.value = DetachedObjectGraph(TransferMode.SAFE, producer).freeze()
        val result = holder.value!!
        holder.value = null
        result as R
    }

    private fun <R> grabAccess(holder:FreezableAtomicReference<Any?>, block: (T) -> R):T{
        val attach = adog.value!!.attach()
        val t = attach as T
        holder.value = block(t)
        return t
    }
}

Красиво и сложно, местами даже слишком.


Теперь рассмотрим, как наладить коммуникацию между контекстами и частями нативного запроса нашего клиента с помощью корутин. Для этого нам доступен следующий функционал:

• Channels

• Flows

• CompletableDeffered

Начнем с CompletaleDeferred. Данный механизм позволит нам awaitable, результат работы которого мы можем вернуть вместо callback в suspend функции:

class DefferedResponseReader: NSObject(), NSURLSessionDataDelegateProtocol {
    private var chunks = ByteArray(0).atomic()
    private var rawResponse = CompletableDeferred<Response>()

    suspend fun awaitResponse(): Response {
        return rawResponse.await().share()
    }

    override fun URLSession(
        session: NSURLSession,
        task: NSURLSessionTask,
        didCompleteWithError: NSError?
    ) {
        val response = task.response as NSHTTPURLResponse
        completed(response.statusCode,didCompleteWithError as? Error)
    }

    fun completed(code: Long, error: Error?) {
        val content = chunks.value.string()

        if (!rawResponse.isCompleted) {

            NSLog("completed: %s",content)
            rawResponse.complete(Response(code, content, error))
            clearChunks()
        } else {
            NSLog("already completed:")
        }
    }

Так как такой код мы можем вызвать только в корутине, то модернизируем и запрос:

class HttpDefferedEngine {

    val engineJob = SupervisorJob()
    val engineScope: CoroutineScope = CoroutineScope(defaultDispatcher 
                                                     + engineJob)

    suspend fun request(request: Request): Response {
       val reader = DefferedResponseReader()
        val urlSession =
            NSURLSession.sessionWithConfiguration(
                NSURLSessionConfiguration.defaultSessionConfiguration, responseReader,//.share(),
                delegateQueue = NSOperationQueue.currentQueue()
            )

  /*....**/
        val task = urlSession.share().dataTaskWithRequest(urlRequest)
        engineScope.launch {
            task?.resume()
        }
        val response = responseReader.awaitResponse()
        return response
    }
}

Так как заморозка вызывается до отправки в скоуп и вынесена на более низкий уровень взаимодействия, то проблема, с которой мы столкнулись ранее, у нас уже решена, и исключения не возникает.

Теперь избавимся от атомарных ссылок. Для этого мы можем использовать следующее API:

   private var chunks = Channel<ByteArray>(UNLIMITED)
    //либо
   private var chunksFlow = MutableStateFlow(ByteArray(0))

В случае channel модифицируем получение данных и отправку в ответе так:

 suspend fun awaitResponse(): Response {
        var array = ByteArray(0)
        var response = rawResponse.await()
        chunks.consumeEach {
            array += it
        }

       response.content = array.string()
        return response.share() // помним о заморозке
    }

 private fun updateChunks(data: NSData) {
        val bytes = data.toByteArray()
        scope.launch {
            chunks.send(bytes)
        }
    }

В случае flow вот так:

 suspend fun awaitResponse(): Response {
        var chunks = ByteArray(0)

        chunksFlow.onEach {
            chunks += it
        }.launchIn(scope)
        val response = rawResponse.await()
        response.content = chunks.string()
        return response.share()
    }
 
  private fun updateChunks(data: NSData) {
        val bytes = data.toByteArray().share()
        chunksFlow.tryEmit(bytes)
    }

Кстати, MutableStateFlow является приемлемой альтернативой MutableLiveData, которую мы спокойно можем использовать в Kotlin Native.

При вызове на стороне общего модуля проблем у нас вообще не возникнет:

class MoviesListViewModel() : BaseViewModel(ioDispatcher) {
    private val service = MoviesService.instance
    val moviesList: MutableStateFlow<List<MoviesItem>> = 
  MutableStateFlow(emptyList())

    fun loadMovies() {
        scope.launch {
            val result = service.loadMovies()
            moviesList.value = result.content?.results ?: arrayListOf()
        }
    }

Но вот прямо на стороне iOS (нативного приложения) вызов флоу выглядит странновато:

 flow.collect(collector: <#T##Kotlinx_coroutines_coreFlowCollector#>, 
       completionHandler: <#T##(KotlinUnit?, Error?) -> Void#>)

Необходимо реализовать специальный коллектор, с помощью которого собирать приходящие значения:

class Collector<T>: Kotlinx_coroutines_coreFlowCollector {

    let callback:(T) -> Void

    init(callback: @escaping (T) -> Void) {
        self.callback = callback
    }


    func emit(value: Any?, completionHandler: @escaping (KotlinUnit?, Error?) -> Void) {
        // do whatever you what with the emitted value
        callback(value as! T)
 //Значения иногда теряются
        completionHandler(KotlinUnit(), nil) 
    }
}

Однако, если сконфигурировать обработку неверно, сигналы могут теряться.

Поэтому на стороне IOS полезно сделать вот такую обработку Flow с помощью расширения-обертки на стороне общего модуля:

class AnyFlow<T>(source: Flow<T>): Flow<T> by source {
    fun collect(onEach: (T) -> Unit, onCompletion: (cause: Throwable?) -> Unit): Cancellable {
        val scope = CoroutineScope(SupervisorJob() + Dispatchers.Main)

        scope.launch {
            try {
                collect {
                    onEach(it)
                }

                onCompletion(null)
            } catch (e: Throwable) {
                onCompletion(e)
            }
        }

        return object : Cancellable {
            override fun cancel() {
                scope.cancel()
            }
        }
    }
}

Получаемый поток можно слушать, как обычный suspend на стороне iOS:

 someFlow().collect { value->
   // do with value
 } onCompletion {
   print("Completed") 
 }

Получилось насыщенно. Какие-то вещи удобнее делать с помощью Kotlin Coroutines, какие-то проще делать без корутин, но нативным API Kotlin Native.
Остается сравнить с тем, что у нас появилось в новой модели управления памятью, о чем смотрите в следующей части.

И полезные ссылки:

Kotlin Multiplatform. Advanced multithreading by Anna Zharkova | KotLand