Implementing RxJava’s timeout() operator in Kotlin Flow

Time to time out

package com.woodblockwithoutco.flowtimeout

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ProducerScope
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.selects.select

@ExperimentalCoroutinesApi
fun <T> Flow<T>.timeout(
timeoutDelay: Long,
timeoutScope: CoroutineScope = CoroutineScope(Dispatchers.Default)
): Flow<T> {
val upstreamFlow = this

return flow {
val collector = this

// create new scope to create values channel in it
// and to confine all the children coroutines there
coroutineScope {

// reroute original flow values into a channel that will be part of select clause
val values = produce {
upstreamFlow.collect { value ->
send(TimeoutState.Value(value))
}
}

// reference to latest used timeout scope so it can be cancelled later
var latestTimeoutScope: ProducerScope<Unit>? = null

// run in the loop until we get a confirmation that flow has ended
var latestValue: TimeoutState = TimeoutState.Initial
while (latestValue !is TimeoutState.Final) {

// start waiting for timeout
val timeout = timeoutScope.produce {
latestTimeoutScope = this
delay(timeoutDelay)
send(Unit)
}

// whatever comes first decides our fate
select<Unit> {

// Two options:
// 1. We got normal value - emission from upstream, cancel timeout scope
// and emit it to downstream
//
// 2. We got null value - upstream flow was cancelled (and thus channel was closed),
// still cancel timeout and set latest value to Done marker, killing the while loop
values.onReceiveOrNull {
latestTimeoutScope?.cancel()

if (it != null) {
latestValue = it
collector.emit(it.value)
} else {
latestValue = TimeoutState.Final.Done
}
}

// we got a timeout! Set latest value to Timeout marker, killing the while loop
timeout.onReceiveOrNull {
if (it != null) {
latestValue = TimeoutState.Final.Timeout
}
}
}
}

// additional cancel in case upstream flow finished without emitting anything
latestTimeoutScope?.close()

// if latest value is a Timeout marker, throw timeout exception
if (latestValue is TimeoutState.Final.Timeout) {
throw TimeoutException()
}
}
}
}

private sealed class TimeoutState {

sealed class Final: TimeoutState() {
object Done: Final()
object Timeout: Final()
}

object Initial: TimeoutState()
data class Value<T>(val value: T): TimeoutState()
}

class TimeoutException: RuntimeException("Timed out waiting for emission")

Nitty-gritty details

val upstreamFlow = this
return flow {
...
}
private sealed class TimeoutState {

sealed class Final: TimeoutState() {
object Done: Final()
object Timeout: Final()
}

object Initial: TimeoutState()
data class Value<T>(val value: T): TimeoutState()
}
var latestValue: TimeoutState = TimeoutState.Initial
while (latestValue !is TimeoutState.Final) {
...
}
val values = produce {
upstreamFlow.collect { value ->
send(TimeoutState.Value(value))
}
}
  1. Emission from upstream
  2. Timeout while waiting for emission
select<Unit> {
values.onReceiveOrNull {
latestTimeoutScope?.cancel()
if (it != null) {
latestValue = it
collector.emit(it.value)
} else {
latestValue = TimeoutState.Final.Done
}
}

timeout.onReceiveOrNull {
if (it != null) {
latestValue = TimeoutState.Final.Timeout
}
}
}
val timeout = timeoutScope.produce {
latestTimeoutScope = this
delay(timeoutDelay)
send(Unit)
}
  1. Upstream Flow collection starts
  2. New coroutine scope is created
  3. Inside that scope, we create a channel which we send upstream values to
  4. We start while loop while saved latest value is not a subclass of TimeoutState.Final
  5. We start the timer and start waiting for an emission
  6. If emission arrives first, we cancel the timer, save latest value and re-enter while loop, going back to 5
  7. If timeout arrives, we save latest value as TimeoutState.Final.Timeout and exit the while loop
  8. If upstream Flow is done, we save latest value as TimeoutState.Final.Done and exit the while loop

Finishing touches

Closing notes

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store