Количество просмотров190
12 сентября 2021

Kotlin Native. Работаем с потоками с корутинами и без. Ч1

Когда мы работаем с Kotlin и Kotlin Multiplatform, то самым и простым удобным способом для настройки работы с многопоточностью в приложении являются Kotlin Coroutines. Наша задача сводится к настройке скоупов CoroutineScope для запуска корутин и suspend функций в основном потоке и фоновом. Т.к. в разных платформенных версиях языка Kotlin этот механизм реализуется по-разному, то необходимо кастомизировать получение контекста корутин с помощью expect/actual:

expect val defaultDispatcher: CoroutineContext

expect val uiDispatcher: CoroutineContext

В случае Kotlin JVM и Android у нас проблем нет (у Kotlin JVM вообще проблем нет, поэтому мы его и не рассматриваем) :

actual val uiDispatcher: CoroutineContext
    get() = Dispatchers.Main

actual val defaultDispatcher: CoroutineContext
    get() = Dispatchers.Default

Мы просто используем доступные диспетчеры корутин Main и Default (можно взять IO).

Но в случае iOS и Kotlin Native не все так просто. Мы, конечно, можем использовать те же самые диспетчеры Main и Default, но при переключении контекста запроса мы получим исключение:

Дело в том, что для того, чтобы мы могли передавать изменяемые блоки кода и объекты между потоками, нам нужно их замораживать перед передачей с помощью команды freeze() . Делать это, конечно, удобнее при скрытом переключении контекста. Поэтому мы создадим свои диспетчеры, которые будут обращаться к нужным DispatchQueue, и будем использовать их как контексты корутин:

actual val defaultDispatcher: CoroutineContext
get() = IODispatcher

actual val uiDispatcher: CoroutineContext
get() = MainDispatcher

private object MainDispatcher: CoroutineDispatcher(){
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        dispatch_async(dispatch_get_main_queue()) {
            try {
                block.run().freeze()
            }catch (err: Throwable) {
                throw err
            }
        }
    }
}
//Background
private object IODispatcher: CoroutineDispatcher(){
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT.toLong(),
0.toULong())) {
            try {
                block.run().freeze()
            }catch (err: Throwable) {
                throw err
            }
        }
    }

Но если для основного потока использование такого MainDispatcher корректно, для для фонового потока мы не сможем работать с dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT.toLong(), 0.toULong()), потому что эта очередь не привязана ни к одному потоку. Поэтому нам стоит поменять вызов на MainDispatcher для обоих контестов.

actual val defaultDispatcher: CoroutineContext
get() = MainDispatcher

actual val uiDispatcher: CoroutineContext
get() = MainDispatcher


@ThreadLocal
private object MainDispatcher: CoroutineDispatcher(){
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        dispatch_async(dispatch_get_main_queue()) {
            try {
                block.run().freeze()
            }catch (err: Throwable) {
                throw err
            }
        }
    }

Аннотация ThreadLocal используется для корректного шаринга неизменяемого объекта (синглтона) между разными потоками.

Кого-то может смутить корректность получившегося решения. Но так советуют разработчики технологии:

В качестве примера корректности такого поведения разбирается асинхронная работа с сетевыми запросами через библиотеку Ktor. Действительно большинство случаев, когда нам в мобильном приложении нужно работать с разными потоками, сводятся к сетевым запросам. И при разработке с помощью Kotlin Multiplatform в основном все мы используем готовую библиотеку Ktor, которая уже реализует всю асинхронную работу под капотом.

А что же нам делать, если наша асинхронная работа не связана с сетевыми запросами, и/или мы не хотим использовать Ktor? Да, многопоточные корутины под Kotlin/Native не поддерживаются (вернее, поддерживаются, но не в основных ветках, а, например, реализациях “1.5.0-native-mt”), но нам же надо что-то делать.

Можем, конечно, попробовать осуществлять маршрутизацию в GlobalScope, ведь он позволяет выполнять корутины именно в фоне. Но это не рекомендованное решение, потому что невозможно отследить выполнение такой корутины и управлять ею. Также это создает потенциальные утечки памяти. Поэтому рекомендуется собирать ссылки на job, которые мы запускаем в GlobalScope, и ожидать их окончания.

//Не рекомендовано
for i in 0..10 {
GlobalScope.launch {
   work(i)
}
}

//Workaround 
val jobs = mutableListOf<Job>()
for i in 0..10 {
  jobs += GlobalScope.launch {
    work(i)
  }
}
jobs.forEach{ it.join() }

Что же нам делать в таком случае? Обратимся к опыту предшественников, т.е Ktor, а также документации по многопоточности в Kotlin Native.


Все, чего нам не хватает при работе с корутинами и скоупами корутин в Kotlin Native, делается некоторыми другими средствами, доступными в самом Kotlin Native из-под коробки. Давайте же разберем их, а заодно и посмотрим, как написать свой сетевой клиент под iOS именно средствами Kotlin Native.


В данном случае будем делать разные реализации классов сетевых клиентов под iOS и Android не с помощью expect/actual, а путем создания классов с общим интерфейсом, через который мы и будем к нашим клиентам обращаться:

interface IHttpClient {
    fun request(request: Request, completion: (Response)->Unit)
}

//Android with OKHttp
class HttpClientAndroid: IHttpClient {
/**...*/
}

//iOS with NSUrlSession
class HttpClientIOS: IHttpClient { 
/**...*/ 
}  

Для iOS будем использовать тот же механизм NSURLSession, что и при нативной разработке, т.к весь Network фреймворк нам доступен в нативном модуле IOS общей части. Единственное, что из-за смены контекстов мы можем работать с сессией только через делегат:

class HttpEngine : ResponseListener {
    private var completion: ((Response) -> Unit)? = null

    fun request(request: Request, completion: (Response) -> Unit) {
        this.completion = completion
 
       val urlSession =
            NSURLSession.sessionWithConfiguration(...)

        val urlRequest =
            NSMutableURLRequest(NSURL.URLWithString(request.url)!!)

       // background
            val task = urlSession.share().dataTaskWithRequest(
              urlRequest)
            task?.resume()
    }

   override fun receiveData(data: NSData) {
      /**
      Main block
      */
    }

В качестве решения для выполнения кода в фоновом потоке мы рассмотрим Worker. Worker является механизмом Kotlin Native для работы с многопоточностью из-под коробки. Каждый worker представляет собой некую очередь работы, с помощью которой можно выполнять некоторые задачи в отдельных потоках. И на каждый worker создается свой отдельный поток:

internal fun background(block: () -> (Any?)) {
    val future = worker.execute(TransferMode.SAFE, { block.share() }) {
        it()
    }
    collectFutures.add(future)
}
private val worker = Worker.start()
private val collectFutures = mutableListOf<Future<*>>()

Future создается при запуске некоторого функционального блока, который мы передаем в worker.execute для выполнения в producer. Мы также можем отследить выполнение future с помощью сохранения в коллекцию ссылки на него (главное, не забыть потом очистить). Чтобы наш блок выполнялся потокобезопасно, используем параметр TransferMode.SAFE и используем расширение для заморозки нашего блока:

internal fun <T> T.share(): T {
    return this.freeze()
}

Также мы можем получить из нашего future результат нашей работы с помощью consume и отправить его в некоторый callback-блок:

internal fun background(block: () -> (Any?), callback: (Any?)->Unit) {
    val future = worker.execute(TransferMode.SAFE, { block.share() }) {
        it()
    }
    future.consume {
        main {
            callback(it)
        }
    }
    collectFutures.add(future)
}

Мы можем вызвать выполнение нашего callback и в главном потоке. Для этого создаем обертку, где мы DispatchQueue.Main будем вызывать наш замороженный блок:

internal fun main(block:()->Unit) {
    block.share().apply {
        val freezedBlock = this
        dispatch_async(dispatch_get_main_queue()) {
            freezedBlock()
        }
    }
}

Еще один момент, прежде, чем мы применим наш worker к коду. Чтобы мы не столкнулись с InvalidMutabilityException: mutation attempt of frozen, нам потребуется заморозить практически все параметры, которые мы используем для нашего сетевого запроса:

fun request(request: Request, completion: (Response) -> Unit) {
  /**....*/
   val urlSession =
            NSURLSession.sessionWithConfiguration(
                NSURLSessionConfiguration.defaultSessionConfiguration, 
              responseReader.share(),
                delegateQueue = NSOperationQueue.currentQueue()
            )

/**....*/
        background {
            val task = urlSession.share().dataTaskWithRequest(urlRequest)
            task?.resume()
        }
    }

И теперь нам надо как-то собирать ответ, который может прийти не весь сразу, а порциями, в массив байтов:

 private var chunks = ByteArray(0)
 
  override fun receiveData(data: NSData) {
        updateChunks(data)
    }

    private fun updateChunks(data: NSData) {
        chunks += data.toByteArray() //!CRASH!!!!
    }
  

Но тут мы получим краш с исключением заморозки. Просто потому, что при работе с замороженными блоками мы заморозили и сетевой клиент, и все его свойства и переменные. И изменить наш массив данных мы просто так не можем.

На помощь нам приходит API AtomicReference. Это не костыль, а полноценное, рекомендуемое и рабочее решение. AtomicReference держит ссылку на замороженный объект, который можно изменять. Для этого добавим расширение, превращающее наши объекты в атомарные, и изменим код:

internal fun <T> T.atomic(): AtomicReference<T>{
    return AtomicReference(this.share())
}

//iOS Client
private val chunks = ByteArray(0).atomic()

private fun updateChunks(data: NSData) {
        var newValue = ByteArray(0)
        newValue += chunks.value
        newValue += data.toByteArray()
        chunks.value = newValue.share()
  }

Также добавим очистку AtomicReference после окончания работы, чтобы предотвратить захламление памяти и утечки:

 private fun clear() {
        clearChunks()
        completion = null
    }

    private fun clearChunks() {
       chunks.value = ByteArray(0).share()
    }

В принципе готово. Мы сможем обращаться к нашему клиенту таким образом:

actual class HttpClient : IHttpClient {
    val httpEngine = HttpEngine()

    /**
    * @param request наш запрос с внутренними параметрами
    */
    actual override fun request(request: Request, completion: (Response) -> Unit) {

        httpEngine.request(request) {
            completion(it)
        }
    }

В итоге мы обошлись без скоупов корутин при запросе. И реализовали собственный асинхронный клиент iOS. Но это еще не все, что мы можем сделать в следующей части мы рассмотрим, как работать с DetachedObjectGraph и использовать Kotlin Flow для удобства.

https://github.com/anioutkazharkova/kotlin_native_network_client