Kotlin Native. Работаем с потоками с корутинами и без. Ч1
Когда мы работаем с Kotlin и Kotlin Multiplatform, то самым и простым удобным способом для настройки работы с многопоточностью в приложении являются Kotlin Coroutines. Наша задача сводится к настройке скоупов CoroutineScope для запуска корутин и suspend функций в основном потоке и фоновом. Т.к. в разных платформенных версиях языка Kotlin этот механизм реализуется по-разному, то необходимо кастомизировать получение контекста корутин с помощью expect/actual:
expect val defaultDispatcher: CoroutineContext
expect val uiDispatcher: CoroutineContext
В случае Kotlin JVM и Android у нас проблем нет (у Kotlin JVM вообще проблем нет, поэтому мы его и не рассматриваем) :
actual val uiDispatcher: CoroutineContext
get() = Dispatchers.Main
actual val defaultDispatcher: CoroutineContext
get() = Dispatchers.Default
Мы просто используем доступные диспетчеры корутин Main и Default (можно взять IO).
Но в случае iOS и Kotlin Native не все так просто. Мы, конечно, можем использовать те же самые диспетчеры Main и Default, но при переключении контекста запроса мы получим исключение:
Дело в том, что для того, чтобы мы могли передавать изменяемые блоки кода и объекты между потоками, нам нужно их замораживать перед передачей с помощью команды freeze() . Делать это, конечно, удобнее при скрытом переключении контекста. Поэтому мы создадим свои диспетчеры, которые будут обращаться к нужным DispatchQueue, и будем использовать их как контексты корутин:
actual val defaultDispatcher: CoroutineContext
get() = IODispatcher
actual val uiDispatcher: CoroutineContext
get() = MainDispatcher
private object MainDispatcher: CoroutineDispatcher(){
override fun dispatch(context: CoroutineContext, block: Runnable) {
dispatch_async(dispatch_get_main_queue()) {
try {
block.run().freeze()
}catch (err: Throwable) {
throw err
}
}
}
}
//Background
private object IODispatcher: CoroutineDispatcher(){
override fun dispatch(context: CoroutineContext, block: Runnable) {
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT.toLong(),
0.toULong())) {
try {
block.run().freeze()
}catch (err: Throwable) {
throw err
}
}
}
Но если для основного потока использование такого MainDispatcher корректно, для для фонового потока мы не сможем работать с dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT.toLong(), 0.toULong()), потому что эта очередь не привязана ни к одному потоку. Поэтому нам стоит поменять вызов на MainDispatcher для обоих контестов.
actual val defaultDispatcher: CoroutineContext
get() = MainDispatcher
actual val uiDispatcher: CoroutineContext
get() = MainDispatcher
@ThreadLocal
private object MainDispatcher: CoroutineDispatcher(){
override fun dispatch(context: CoroutineContext, block: Runnable) {
dispatch_async(dispatch_get_main_queue()) {
try {
block.run().freeze()
}catch (err: Throwable) {
throw err
}
}
}
Аннотация ThreadLocal используется для корректного шаринга неизменяемого объекта (синглтона) между разными потоками.
Кого-то может смутить корректность получившегося решения. Но так советуют разработчики технологии:
В качестве примера корректности такого поведения разбирается асинхронная работа с сетевыми запросами через библиотеку Ktor. Действительно большинство случаев, когда нам в мобильном приложении нужно работать с разными потоками, сводятся к сетевым запросам. И при разработке с помощью Kotlin Multiplatform в основном все мы используем готовую библиотеку Ktor, которая уже реализует всю асинхронную работу под капотом.
А что же нам делать, если наша асинхронная работа не связана с сетевыми запросами, и/или мы не хотим использовать Ktor? Да, многопоточные корутины под Kotlin/Native не поддерживаются (вернее, поддерживаются, но не в основных ветках, а, например, реализациях “1.5.0-native-mt”), но нам же надо что-то делать.
Можем, конечно, попробовать осуществлять маршрутизацию в GlobalScope, ведь он позволяет выполнять корутины именно в фоне. Но это не рекомендованное решение, потому что невозможно отследить выполнение такой корутины и управлять ею. Также это создает потенциальные утечки памяти. Поэтому рекомендуется собирать ссылки на job, которые мы запускаем в GlobalScope, и ожидать их окончания.
//Не рекомендовано
for i in 0..10 {
GlobalScope.launch {
work(i)
}
}
//Workaround
val jobs = mutableListOf<Job>()
for i in 0..10 {
jobs += GlobalScope.launch {
work(i)
}
}
jobs.forEach{ it.join() }
Что же нам делать в таком случае? Обратимся к опыту предшественников, т.е Ktor, а также документации по многопоточности в Kotlin Native.
Все, чего нам не хватает при работе с корутинами и скоупами корутин в Kotlin Native, делается некоторыми другими средствами, доступными в самом Kotlin Native из-под коробки. Давайте же разберем их, а заодно и посмотрим, как написать свой сетевой клиент под iOS именно средствами Kotlin Native.
В данном случае будем делать разные реализации классов сетевых клиентов под iOS и Android не с помощью expect/actual, а путем создания классов с общим интерфейсом, через который мы и будем к нашим клиентам обращаться:
interface IHttpClient {
fun request(request: Request, completion: (Response)->Unit)
}
//Android with OKHttp
class HttpClientAndroid: IHttpClient {
/**...*/
}
//iOS with NSUrlSession
class HttpClientIOS: IHttpClient {
/**...*/
}
Для iOS будем использовать тот же механизм NSURLSession, что и при нативной разработке, т.к весь Network фреймворк нам доступен в нативном модуле IOS общей части. Единственное, что из-за смены контекстов мы можем работать с сессией только через делегат:
class HttpEngine : ResponseListener {
private var completion: ((Response) -> Unit)? = null
fun request(request: Request, completion: (Response) -> Unit) {
this.completion = completion
val urlSession =
NSURLSession.sessionWithConfiguration(...)
val urlRequest =
NSMutableURLRequest(NSURL.URLWithString(request.url)!!)
// background
val task = urlSession.share().dataTaskWithRequest(
urlRequest)
task?.resume()
}
override fun receiveData(data: NSData) {
/**
Main block
*/
}
В качестве решения для выполнения кода в фоновом потоке мы рассмотрим Worker. Worker является механизмом Kotlin Native для работы с многопоточностью из-под коробки. Каждый worker представляет собой некую очередь работы, с помощью которой можно выполнять некоторые задачи в отдельных потоках. И на каждый worker создается свой отдельный поток:
internal fun background(block: () -> (Any?)) {
val future = worker.execute(TransferMode.SAFE, { block.share() }) {
it()
}
collectFutures.add(future)
}
private val worker = Worker.start()
private val collectFutures = mutableListOf<Future<*>>()
Future создается при запуске некоторого функционального блока, который мы передаем в worker.execute для выполнения в producer. Мы также можем отследить выполнение future с помощью сохранения в коллекцию ссылки на него (главное, не забыть потом очистить). Чтобы наш блок выполнялся потокобезопасно, используем параметр TransferMode.SAFE и используем расширение для заморозки нашего блока:
internal fun <T> T.share(): T {
return this.freeze()
}
Также мы можем получить из нашего future результат нашей работы с помощью consume и отправить его в некоторый callback-блок:
internal fun background(block: () -> (Any?), callback: (Any?)->Unit) {
val future = worker.execute(TransferMode.SAFE, { block.share() }) {
it()
}
future.consume {
main {
callback(it)
}
}
collectFutures.add(future)
}
Мы можем вызвать выполнение нашего callback и в главном потоке. Для этого создаем обертку, где мы DispatchQueue.Main будем вызывать наш замороженный блок:
internal fun main(block:()->Unit) {
block.share().apply {
val freezedBlock = this
dispatch_async(dispatch_get_main_queue()) {
freezedBlock()
}
}
}
Еще один момент, прежде, чем мы применим наш worker к коду. Чтобы мы не столкнулись с InvalidMutabilityException: mutation attempt of frozen, нам потребуется заморозить практически все параметры, которые мы используем для нашего сетевого запроса:
fun request(request: Request, completion: (Response) -> Unit) {
/**....*/
val urlSession =
NSURLSession.sessionWithConfiguration(
NSURLSessionConfiguration.defaultSessionConfiguration,
responseReader.share(),
delegateQueue = NSOperationQueue.currentQueue()
)
/**....*/
background {
val task = urlSession.share().dataTaskWithRequest(urlRequest)
task?.resume()
}
}
И теперь нам надо как-то собирать ответ, который может прийти не весь сразу, а порциями, в массив байтов:
private var chunks = ByteArray(0)
override fun receiveData(data: NSData) {
updateChunks(data)
}
private fun updateChunks(data: NSData) {
chunks += data.toByteArray() //!CRASH!!!!
}
Но тут мы получим краш с исключением заморозки. Просто потому, что при работе с замороженными блоками мы заморозили и сетевой клиент, и все его свойства и переменные. И изменить наш массив данных мы просто так не можем.
На помощь нам приходит API AtomicReference. Это не костыль, а полноценное, рекомендуемое и рабочее решение. AtomicReference держит ссылку на замороженный объект, который можно изменять. Для этого добавим расширение, превращающее наши объекты в атомарные, и изменим код:
internal fun <T> T.atomic(): AtomicReference<T>{
return AtomicReference(this.share())
}
//iOS Client
private val chunks = ByteArray(0).atomic()
private fun updateChunks(data: NSData) {
var newValue = ByteArray(0)
newValue += chunks.value
newValue += data.toByteArray()
chunks.value = newValue.share()
}
Также добавим очистку AtomicReference после окончания работы, чтобы предотвратить захламление памяти и утечки:
private fun clear() {
clearChunks()
completion = null
}
private fun clearChunks() {
chunks.value = ByteArray(0).share()
}
В принципе готово. Мы сможем обращаться к нашему клиенту таким образом:
actual class HttpClient : IHttpClient {
val httpEngine = HttpEngine()
/**
* @param request наш запрос с внутренними параметрами
*/
actual override fun request(request: Request, completion: (Response) -> Unit) {
httpEngine.request(request) {
completion(it)
}
}
В итоге мы обошлись без скоупов корутин при запросе. И реализовали собственный асинхронный клиент iOS. Но это еще не все, что мы можем сделать в следующей части мы рассмотрим, как работать с DetachedObjectGraph и использовать Kotlin Flow для удобства.