Skip to content

Corrutinas y Flow

Eduardo Ignacio Jaramillo Carrasco edited this page Feb 23, 2023 · 4 revisions

Corrutinas y Flow

Corrutinas

agregamos la dependencia de la librería de estas url

Qué es una corrutina Una corrutina es un proceso que utiliza una mínima parte de un Thread
para ejecutarse, esto incluye que si un proceso queda esperando alguna
respuesta, ese recurso tomado anteriormente por la corrutina se libera y
una vez exista una respuesta, la corrutina toma nuevamente una parte de
un proceso, que no necesariamente es el mismo que el anterior.

Ejemplo

fun coroutineVariosProcesos() {
    runBlocking {
        (1..1_000_000)
            .forEach {
                launch {
                    delay(someTimeCoroutineVariosProcesos())
                    print("#")
                    println("corrutina hilo ---" + Thread.currentThread().name)

                }
            }
    }
}

En qué se diferencia una corrutina de un subproceso thread Una corrutina utiliza una mínima parte de la memoria de un Thread,
en cambio un Thread es un proceso que son limitados sobre todos en los
dispositivos móviles, en algunos casos se limitan a 8 Thread.

Ejemplo

En este ejemplo de Thread nos damos cuenta que esto genera un \
desbordamiento de memoria, ya que, está generando una cantidad \
de hilos que no cualquier dispositivo puede tener disponible.

fun threadsOutMemoryRange() {
    (1..1_000_000).forEach {
        thread {
            Thread.sleep(someTimeOutMemoryRange())
            println("#")
        }
    }
}

Por qué usar corroutinas Se usa corrutinas para optimizar al máximo los procesos en la
máquina que están corriendo, en el caso mobile esto es de suma
importancia, ya que, debemos optimizar los procesos debido a la
poca cantidad de hilos en los cuales trabajamos.

Existe una ventaja de la corrutina Si, ya que, utiliza una mínimia parte de un Thread, por lo tanto,
se puede ejecutar una cantidad infinita de corrutinas.

Qué es necesario para utilizar corrutinas

  • Tener conocimientos previos de tareas en segundo plano
  • Agregar las librerías de corrutinas

Alcances para ejecutar una corrutina

GlobalScope

  • Global Scope nos permite que cada rutina esté en ejecución \mientras la aplicación esté viva.
fun globalScope() {
    newTopic("Global Scope")
    GlobalScope.launch {
        startMessage()
        delay(someTime())
        println("Mi corrutina")
        endMessage()
    }
}

function suspend

  • Suspender una función significa que dicha función va a poder
    ser pausada mientras se espera un resultado y en lo que se espera,
    el hilo podrá ser reutilizado para otras tareas que normalmente
    se recomienda que también sean funciones suspendidas.
public suspend fun delay(timeMillis: Long) {
    if (timeMillis <= 0) return // don't delay
    return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
        // if timeMillis == Long.MAX_VALUE then just wait forever like awaitCancellation, don't schedule.
        if (timeMillis < Long.MAX_VALUE) {
            cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
        }
    }
}

las funciones suspendidas se pueden llamar de otras funciones\
suspendidas o dentro de una corrutina, en este caso delay() es\
una función suspendida y necesita ser ejecuta dentro de una\
corrutina u otra función suspend.

fun suspendFunction() {
    println("Suspend")
    Thread.sleep(someTime())
    GlobalScope.launch {
        delay(someTime())
    }
}

Constructores de corrutinas

runBlocking: Este constructor va a suspender el hilo hasta
que nuestro bloque de código termine y crea una corrutina

fun cRunblocking() {
    runBlocking {
        startMessage()
        delay(someTime())
        println("runBlocking...")
        endMessage()
    }
}

launch

  • Este crea una corrutina de tipo Job, es decir, un objeto cancelable.
  • Este constructor va a ser ejecutado dentro de otra corrutina o una función suspendida.
  • Está diseñado para hacer tareas que no devuelvan ningún valor
fun cLaunch() {
    runBlocking {
        launch {
            startMessage()
            delay(someTime())
            println("launch...")
            endMessage()
        }
    }
}

Async

  • Necesita otra corrutina o función suspendida
  • Está diseñado para recibir valores de retorno y el resultado será un tipo Defered<T>
fun cAsync() {
    runBlocking {
        val result = async {
            startMessage()
            delay(someTime())
            println("Async...")
            endMessage()
            1
        }
        println("result: ${result.await()}")
    }
}

salida : 1

Ejemplo de tipo Defered<T> Para obtener el resultado de retorno se utiliza deferred.await()

fun deferred() {
    runBlocking {
        println("Deferred")
        val deferred = async {
            startMessage()
            delay(someTime())
            println("deferred ...")
            endMessage()
            multi(2,3)
        }
        println("Deferred: $deferred")

        //cabe señalar que la función println no se ejecutará hasta que termine
        // de evaluarse la función await(), ya que, await() es una función suspendida
        //public suspend fun await(): T
        println("valor del Deferred.await: ${deferred.await()}")\

        // otra forma de utilizar async con await
        val result = async {
            multi(3,3)
        }.await()

        println("Result : $result")
    }
}

Job

  • Es el ciclo de vida de una corrutina, es decir, un trabajo en
    segundo plano que puede ser cancelable
    • estados del job: Activo, Cancelado, Completado
fun job() {
    runBlocking {
        val job = launch {
            startMessage()
            delay(2_100)
            println("job...")
            endMessage()
        }
        println("job---- $job")
        println("isActive----${job.isActive}")
        println("isCancel----${job.isCancelled}")
        println("isComplete----${job.isCompleted}")

        delay(someTime())
        println("Tarea cancelada o interrumpida")
        job.cancel()

        println("isActive----${job.isActive}")
        println("isCancel----${job.isCancelled}")
        println("isComplete----${job.isCompleted}")
    }
}

Produce

  • Este constructor forma parte de un patrón de diseño productor - consumidor
fun cProduce() = runBlocking{
    val names = produceNames()
    names.consumeEach {
        println(it)
    }
}

fun CoroutineScope.produceNames(): ReceiveChannel<String> = produce {
    (1..5).forEach{
        send("name $it")
    }
}

Dispatchers

  • Nos ayudan a definir donde vamos a ejecutar los hilos de las corrutinas
    • IO: ideal para conexiones a base de datos locales o remotas, además para escrita o lectura de archivos
fun dispatchers() {
    runBlocking {
        launch {
            startMessage()
            println("none")
            endMessage()
        }
        launch(Dispatchers.IO) {
            startMessage()
            println("IO")
            endMessage()
        }
    }
}
  • Main : Este está disponible para Android, es el hilo principal conectado a la interfaz del usuario y se recomienda usar para tareas muy rápidas o que está relacionadas con el cambio en la interfaz
launch(Dispatchers.Main) {
            startMessage()
            println("Main")
            endMessage()
        }
  • Default : Es para uso intensivo de la cpu
launch(Dispatchers.Default) {
            startMessage()
            println("Default")
            endMessage()
        }
  • Custom Dispatcher : se puede crear un propio dispatcher, y lo recomendan para hacer debugging
launch(newSingleThreadContext("Custom Dispatcher Lalin"){
    startMessage()
    println("corrutina personalizada con un Dispatcher")
    endMessage()
}
  • Job Anidados :
fun nestedDispatcher() {
    runBlocking {
        val job = launch {
            startMessage()

            launch {
                startMessage()
                delay(someTime())
                println("Otra tarea")
                endMessage()
            }
            launch(Dispatchers.IO) {
                startMessage()

                launch(newSingleThreadContext("Custom Dispatcher Edu")) {
                    startMessage()
                    println("Tarea con custom dispatcher edu")
                    endMessage()
                }

                delay(someTime())
                println("Tarea en el servidor")
                endMessage()
            }

            var sum = 0
            (1..100).forEach {
                sum += it
                delay(someTime() / 100)
            }
            println("Sum= $sum")
            endMessage()
        }
        delay(someTime() / 2)
        job.cancel()
        println("Job Cancelado...")
    }
}
  • WithContext() : sirve para cambiar el contexto de un bloque de código dentro de una corrutina
fun changeWithContext() {
    runBlocking {
        startMessage()

        withContext(newSingleThreadContext("Custom Dispatchers Edu")){
            startMessage()
            delay(someTime())
            println("Dentro Custom Dispatchers Edu")
            endMessage()
        }

        withContext(Dispatchers.IO){
            startMessage()
            delay(someTime())
            println("petición al servidor")
            endMessage()
        }

        println("Dentro de la corrutina principal")

        endMessage()
    }
}

Flow

  • Como primer acercamiento analizaremos las secuencias(sequence); es una colección que se encarga de procesar y entregar valores por pasos, donde puede ejecutar un procesamiento por cada elemento y ser perezosa, es decir, que procesará cada elemento hasta que se solicite y no toda la colección en una sola acción.
fun getDatabySeq(): Sequence<Float>{
    return sequence {
        (1..5).forEach {
            println("procesado datos ...")
            Thread.sleep(someTime())
            yield(20 +it + Random.nextFloat())
        }
    }
}

yield: Este se encarga de producir un valor final al consumidor
forEach: este procesa la información devuelta por la función yield.

fun secuences() {
    getDatabySeq()
        .forEach {
            println("${it}°")
        }

}

  • Flow: sirve resolver aquellos casos donde existe código asíncrono que retorna múltiples valores
fun getDataByFlow(): Flow<Float>{
    return flow {
        (1..5).forEach {
            println("procesado datos ...")
            delay(someTime())
            emit(20 +it + Random.nextFloat())
        }
    }
}

la función collect es una función suspend, por eso se puede ejecutar este bloque de código.

fun basicFlow() {
    runBlocking {
        launch {
            getDataByFlow()
                .collect{
                    println(it)
                }
        }

        launch {
            (1..50).forEach {
                delay(someTime())
                println("Tarea 2...")
            }
        }
    }
}
  • Cold Flow: Significa que no se va a ejecutar su código interno hasta que no se llame su método collect()
fun coldFlow() {
    runBlocking {
        val dataFlow = getDataByFlow()
        println("esperando...")
        delay(someTime())
        dataFlow.collect{
            println(it)
        }
    }
}

//para cancelar un flow

fun cancelFlow() {
    runBlocking {
        val job = launch {
            getDataByFlow()
                .collect {
                    println(it)
                }
        }
        delay(someTime() * 2)
        job.cancel()
    }
}

Operadores Flow Intermediarios:

- Map: la ventaja de este operador es que se puede\n ejecutar una función suspendida dentro de su bloque de código
    fun flowOperatorMap() {
        runBlocking {
            getDataByFlow()
                .map {
                    //setFormat(it)
                    setFormat(convertCelsToFahr(it),"F")
                }
                .collect{
                    println(it)
                }
        }
    }

    fun convertCelsToFahr(cels: Float): Float =
        ((cels * 9) / 5) + 32

fun setFormat(temp: Float, degree: String = "C"): String=
   String.format(Locale.getDefault(),
    "%.1f°$degree", temp)
  • Filter: filtra datos
fun flowOperatorFilter() {
    runBlocking {
        getDataByFlow()
            .filter {
                it < 32
            }
            .map {
                setFormat(it)
            }
            .collect{
                println(it)
            }
    }
}
  • Transform: tiene la característica que se va a trasmitir almenos un valor, se puede trasmitir un segundo valor, en el ejemplo es el segundo emit. \También cuando requerimos múltiples procesamientos.
fun flowOperatorTransform() {
    runBlocking {
        getDataByFlow()
            .transform {
                emit(it)
                emit(setFormat(convertCelsToFahr(it),"F"))
            }
            .collect{
                println(it)
            }
    }
}
  • Take: Sirve para limitar los resultados que recibe el usuario
fun flowOperatorTake() {
    runBlocking {
        getDataByFlow()
            .take(3)
            .map { setFormat(it) }
            .collect{
                println(it)
            }
    }
}

Operadores de flow finales o terminales

  • toList() y single()
fun flowOperatorListAndSingle() {
    runBlocking {
        val list = getDataByFlow()
            .toList()
        println("list $list")

        val single = getDataByFlow()
            .take(1)
            .single()
        println("single: $single")
    }
}
  • first() and last(): first() para la ejecución una vez tiene el primer valor y
    y last() espera hasta el último elemento emitido para retornarlo
fun flowOperatorFirstAndLast() {
    runBlocking {
        val first = getDataByFlow()
            .first()
        println("first $first")

        val last = getDataByFlow()
            .last()
        println("last $last")
    }
}
  • reduce: trabaja como un acumulador
fun flowOperatorReduce() {
    runBlocking {
        val saving = getDataByFlow()
            .reduce { accumulator, value ->
                println("Acumulator: $accumulator")
                println("Value: $value")
                println("Current Saving: ${accumulator + value}")
                println()
                accumulator + value
            }
        println(saving)
    }
}
  • fold: toma un acumulador inicial
//en este caso val saving está guardando el valor inicial para utilizarlo en el fold.

fun flowOperatorFold() {
    runBlocking {
        val saving = getDataByFlow()
            .reduce { accumulator, value ->
                println("reduce.Acumulator: $accumulator")
                println("reduce.Value: $value")
                println("reduce.Current Saving: ${accumulator + value}")
                println()
                accumulator + value
            }

        val lastSaving = getDataByFlow()
            .fold(saving,{acc, value ->
                println("fold.Acumulador: $acc")
                println("fold.Value: $value")
                println("fold.Current saving: ${acc + value}")
                acc + value
            })
        println(lastSaving)
    }
}
  • Buffer: sirve para juntar al máximo posible los procesos, respetando la integridad del flujo
fun flowOperatorBuffer() {
    runBlocking {
        val time = measureTimeMillis {
            getDataByFlowStatic()
                .map {
                    setFormat(it)
                }
                .buffer()
                .collect{
                    delay(500)      //000111222333444
                    println(it)     //   0000011111222223333344444
                }
        }
        println()
        println("Time: ${time}ms")
    }
}

// resultados: sin el buffer demora como 4 segundos
// con el buffer() demora como 2.5 segundos.

fun getDataByFlowStatic(): Flow<Float> {
    return flow {
        (1..5).forEach {
            println("procesado datos ...")
            delay(300)
            emit(20 + it + Random.nextFloat())
        }
    }
}
```kotlin
- Conflate: este solo toma el último valores del emit.

```kotlin
fun flowOperatorConflate() {
    runBlocking {
        val time = measureTimeMillis {
            getMatchResultsFlow()
                .conflate()
                .collect{
                    delay(100)
                    println(it)
                }
        }

        println("Time: $time")
    }
}

fun getMatchResultsFlow(): Flow<String>{
    return flow {
        var homeTeam = 0
        var awayTeam = 0
        (0..45).forEach {
            println("minuto: $it")
            delay(50)
            homeTeam += Random.nextInt(0,21) / 20
            awayTeam += Random.nextInt(0,21) / 20
            emit("$homeTeam - $awayTeam")
        }
    }
}
  • Zip y Combine.

    • (Zip): es una forma de mezclar dos flujos y el resultado\ se conoce como composición de flujo, es flujo es tan largo como el \flujo del menor tamaño
    fun flowOperatorZipAndCombine() {
        runBlocking {
            getDataByFlowStatic()
                .map { setFormat(it) }
                .zip(getMatchResultsFlow()){degress, result ->
                    "$result with $degress"
                }
                .collect{
                    println(it)
                }
        }
    }
    
    fun getDataByFlowStatic(): Flow<Float> {
        return flow {
            (1..5).forEach {
                println("procesado datos ...")
                delay(300)
                emit(20 + it + Random.nextFloat())
            }
        }
    }
    
    fun getMatchResultsFlow(): Flow<String>{
        return flow {
            var homeTeam = 0
            var awayTeam = 0
            (0..45).forEach {
                println("minuto: $it")
                delay(50)
                homeTeam += Random.nextInt(0,21) / 20
                awayTeam += Random.nextInt(0,21) / 20
                emit("$homeTeam - $awayTeam")
            }
        }
    }
    • (Combine): Este sí mezcla todo el flujo del mas grande.
    fun flowOperatorZipAndCombine() {
        runBlocking {
            getDataByFlowStatic()
                .map { setFormat(it) }
                .combine(getMatchResultsFlow()){degress, result ->
                    "$result with $degress"
                }
                /*.zip(getMatchResultsFlow()){degress, result ->
                    "$result with $degress"
                }*/
                .collect{
                    println(it)
                }
        }
    }
  • (FlatMapConcat and FlatMapMerge): Flujos de aplanamiento diagramas de los operadores flat

fun flowFrom(elem: String) = flowOf(1, 2, 3)
    .onEach { delay(2000) }
    .map { "${elem}_${it}" }

// flat map concat 
flowOf("A", "B", "C")
            .flatMapConcat { flowFrom(it) }
            .collect { println(it) }

// flat map latest
flowOf("A", "B", "C")
            .onEach { delay(1000) }
            .flatMapLatest { flowFrom(it) }
            .collect { println(it) }

// flat map merge
flowOf("A", "B", "C")
            .flatMapMerge() { flowFrom(it) }
            .collect { println(it) }

fun flowFrom(elem: String) = flowOf(1, 2, 3)
    .onEach { delay(2000) }
    .map { "${elem}_${it}" }

 fun flowOperatorFlatMapConcatAndFlatMapMerge() {
    runBlocking {
        val milis = measureTimeMillis {
            getCitiesFlow()
                //.buffer()
                .flatMapMerge { city -> // Flow<Flow<TYPE>>
                    println("flatMapMerge. $city")
                    getDataToFlatFlow(city)
                }
                /*.flatMapConcat { city-> // Flow<Flow<TYPE>>
                getDataToFlatFlow(city)
            }*/
                //.map { setFormat(it) }
                .collect { cities ->
                    println(cities)
                }
        }
        println("milis :::: $milis")
    }
}

fun getDataToFlatFlow(city: String): Flow<Float> = flow {
    (1..3)
        .forEach {
            println("Temperatura de ayer en $city...")
            emit(Random.nextInt(10, 30).toFloat())

            println("Temperatura actual en $city")
            delay(500)
            emit(20 + it + Random.nextFloat())
        }
}

fun getCitiesFlow(): Flow<String> = flow {
    listOf("Santander", "CDMX", "Lima")
        .forEach { city ->
            delay(100)
            println("\nConsultando Ciudad $city---")
            emit(city)
        }
}
Clone this wiki locally