LiveData & Coroutine and Flow Unit Testing
We have seen using LiveData & Flow and just Flow in MVVM architecture in Part I and Part II . In this post, we will see how to test LiveData & Coroutine and Flow. We will troubleshoot common race conditions between LiveData and Coroutines with TestCoroutineDispatcher . After that, we will see how easy it is to test Flow
You can access the sample project from https://github.com/fgiris/LiveDataWithFlowSample/tree/fgiris/android_turkiye_work_around
LiveData & Coroutine Unit Testing
You may have seen your branch build locally, but every time you push it to remote, your build gets . When you check the logs, you see that one of the tests you added fails because LiveData is not giving you the correct value. But why? Is it because of LiveData or Coroutines?
One of the common problems, when you test Coroutines with LiveData, is race conditions. You have to be very careful with Dispatchers, Loopers... If you don't have proper testing rules for them, your mind can explode and you can say "Let's comment it out, and we can look at it later.
To prevent this from happening, let's start with a very simple function in the repository.
/**
* This methods is used to make one shot request to get
* fake weather forecast data
*/
suspend fun fetchWeatherForecastSuspendCase(): Result<Int> {
// Fake api call
delay(1000)
// Send a random fake weather forecast data
return Result.Success((0..20).random())
}
We will test whether this function returns a success value.
@Test
fun fetchWeatherForecastSuspendCase_ShouldReturnSuccess() {
runBlocking {
// Api call
val weatherForecast = weatherForecastRepository.fetchWeatherForecastSuspendCase()
// Check whether the result is successful
assert(weatherForecast is Result.Success)
}
}
We need to call the suspend function from the coroutine scope and this is why runBlocking is used here. This test function passes successfully.
What about ViewModel? Let's move on and see how fetchWeatherForecastSuspendCasefunction is used in ViewModel.
class WeatherForecastOneShotViewModel @Inject constructor(
private val weatherForecastRepository: WeatherForecastRepository
) : ViewModel() {
private var _weatherForecast = MutableLiveData<Result<Int>>()
...
private fun fetchWeatherForecast() {
// Set value as loading
_weatherForecast.value = Result.Loading
// Uses Dispatchers.Main
// We will fix it later in the article
viewModelScope.launch {
// Fetch and update weather forecast LiveData
_weatherForecast.value = weatherForecastRepository.fetchWeatherForecastSuspendCase()
}
}
}
The important thing here is that we start a new coroutine on the function we are going to test. We will test whether the fetchWeatherForecast function first returns Loading and then Success.
@Test
fun weatherForecastLiveData_ShouldPostLoadingThenSuccess() {
runBlocking {
// Check whether the first value is loading
assert(viewModel.weatherForecast.value == Result.Loading)
// Wait for the response
delay(1000)
// Check whether the response is successful
assert(viewModel.weatherForecast.value is Result.Success)
}
}
runBlocking is used because we have a delay function to wait for the actual response. I know using delay is not a good solution, but this is a very basic example and we will improve it later, don't worry.
When you run this test, you will get a RunTimeException for the main Looper.
The reason for this from official documents;
The android.jar file used to run unit tests does not contain any actual code — that is provided by the Android system image on a real device. Instead, all methods throw exceptions (by default). This is to ensure that your unit tests only test your code and do not rely on specific behavior of the Android platform (that you have not explicitly mocked, for example using Mockito).
It says that there is no access to the Android system image (which includes the main Looper for your app) in unit tests. To fix this, we have 2 options.
- Enable default values for unit tests in gradle file. ( Not recommended )
- Added InstantTaskExecutorRule which overrides isMainThreadmethod which would be called to get the main looper. It also changes the background executor for Architecture Components, to run them synchronously in tests.
@get:Rule
var instantTaskExecutorRule = InstantTaskExecutorRule()
@Test
fun weatherForecastLiveData_ShouldPostLoadingThenSuccess() {
runBlocking {
// Check whether the first value is loading
assert(viewModel.weatherForecast.value == Result.Loading)
// Wait for the response
delay(1000)
// Check whether the response is successful
assert(viewModel.weatherForecast.value is Result.Success)
}
}
Let's run the test again after adding the InstantTaskExecutorRule rule.
It doesn't let it pass anymore
...
Now it complains because the main dispatcher is not initialized. The reason for this is that Dispatchers.Main requires a main Looper which is not available in the unit test. So the solution is to replace Dispatchers.Main with TestCoroutineDispatcher .
Let's write a general rule for using TestCoroutineDispatcher.
@ExperimentalCoroutinesApi
class MainCoroutineRule(
val testDispatcher: TestCoroutineDispatcher = TestCoroutineDispatcher()
) : TestWatcher() {
override fun starting(description: Description?) {
super.starting(description)
Dispatchers.setMain(testDispatcher)
}
override fun finished(description: Description?) {
super.finished(description)
Dispatchers.resetMain()
testDispatcher.cleanupTestCoroutines()
}
}
You can find the entire implementation of this rule from here. We just set the main dispatcher to TestCoroutineDispatcher every time the test starts. Let's add this rule in our test as well and run it again.
@get:Rule
var mainCoroutineRule = MainCoroutineRule()
@get:Rule
var instantTaskExecutorRule = InstantTaskExecutorRule()
@Test
fun weatherForecastLiveData_ShouldPostLoadingThenSuccess() {
mainCoroutineRule.testDispatcher.runBlockingTest {
// Check whether the first value is loading
assert(viewModel.weatherForecast.value == Result.Loading)
// Wait for the response
delay(1000)
// Check whether the response is successful
assert(viewModel.weatherForecast.value is Result.Success)
}
}
Voilà Our test passed after all this process.
TestCoroutineDispatcher has runBlockingTest which will immediately resume the delay with a virtual clock. This can be used for faster tests than runBlocking .
But wait, we used Dispatcher.Main, which is primarily suited for UI operations, to make network requests in the ViewModel. Let’s go back and replace it with Dispatchers.IO which is basically a better choice for network requests.
class WeatherForecastOneShotViewModel @Inject constructor(
private val weatherForecastRepository: WeatherForecastRepository
) : ViewModel() {
private var _weatherForecast = MutableLiveData<Result<Int>>()
...
private fun fetchWeatherForecast() {
// Set value as loading
_weatherForecast.value = Result.Loading
// Changed dispatcher to Dispatchers.IO
viewModelScope.launch(Dispatchers.IO) {
// Fetch and update weather forecast LiveData
_weatherForecast.value = weatherForecastRepository.fetchWeatherForecastSuspendCase()
}
}
}
After replacing the dispatcher with Dispatchers.IO , the test fails again . The reason is that we only override the main dispatcher. However, when you start a new coroutine with another dispatcher, it will be executed in a different thread.
Can we also override other dispatchers with functions like Dispatchers.setMain? Unfortunately not. This is why we need to inject dispatchers.
Always inject dispatcher for better testability.
class WeatherForecastOneShotViewModel @Inject constructor(
private val weatherForecastRepository: WeatherForecastRepository,
private val dispatcher: CoroutineDispatcher
) : ViewModel() {
private var _weatherForecast = MutableLiveData<Result<Int>>()
...
private fun fetchWeatherForecast() {
// Set value as loading
_weatherForecast.value = Result.Loading
// Use the injected dispatcher
viewModelScope.launch(dispatcher) {
// Fetch and update weather forecast LiveData
_weatherForecast.value = weatherForecastRepository.fetchWeatherForecastSuspendCase()
}
}
}
We need to provide the same test operators to the ViewModel as it is created in the test.
@Before
fun setup() {
...
viewModel = WeatherForecastOneShotViewModel(
weatherForecastRepository,
// Use the same test dispatcher
// which is used to run the tests
mainCoroutineRule.testDispatcher
)
}
After that our test passed even though we used different operators.
It's time to remove the delay to wait for the api call. Since we are using runBlockingTest and all delay functions will be processed immediately, you can choose to write a 100 seconds delay like delay(100000), which is expected to be greater than each of your api call response times and test the api call after that delay. But that is a very bad solution
Luckily, you can go ahead and pause the dispatcher from starting new coroutines while doing some testing. This allows us to control new coroutines that are started in the test function.
@Test
fun weatherForecastLiveData_ShouldPostLoadingThenSuccessWithoutDelay() {
mainCoroutineRule.testDispatcher.runBlockingTest {
// Pause dispatcher so that no new coroutines will be started
mainCoroutineRule.testDispatcher.pauseDispatcher()
// Initialization of view model includes api call with creating
// new coroutine but it will not be invoked since we paused dispatcher
viewModel = WeatherForecastOneShotViewModel(
weatherForecastRepository,
mainCoroutineRule.testDispatcher
)
// Check whether the first value is loading
assert(viewModel.weatherForecast.value == Result.Loading)
// Resume dispatcher
// This will resume from the part which starts a new coroutine
// to make the api call in view model
mainCoroutineRule.testDispatcher.resumeDispatcher()
// Check whether the response is successful
assert(viewModel.weatherForecast.value is Result.Success)
}
}
We just controlled the execution of the coroutine and our test passed.
Test Flow
Testing Flow is much easier than testing LiveData & Coroutines. Since Flow provides all the data in the same coroutine context where it is collected, there are no race conditions on the collector side. So, this allows us to test the data sequentially without pausing or resuming anything.
Let's use this simple function that returns a stream of data in a repository.
/**
* This methods is used to make one shot request to get
* fake weather forecast data
*/
fun fetchWeatherForecast() = flow {
emit(Result.Loading)
// Fake api call
delay(1000)
// Send a random fake weather forecast data
emit(Result.Success((0..20).random()))
}
We will test whether this function loads first and then gives a successful response. We can use the collection terminal operator to start data collection in the test.
@Test
fun fetchWeatherForecast_ShouldReturnLoadingThenResult_collect() {
mainCoroutineRule.testDispatcher.runBlockingTest {
// Make api call
val weatherForecastFlow = weatherForecastRepository.fetchWeatherForecast()
weatherForecastFlow.collect {
// Check whether first data is loading
// Check whether second data is Success
}
}
}
Since we don't have a data index, let's use the collectIndexed operator.
@Test
fun fetchWeatherForecast_ShouldReturnLoadingThenResult_collectIndexed() {
mainCoroutineRule.testDispatcher.runBlockingTest {
// Make api call
val weatherForecastFlow = weatherForecastRepository.fetchWeatherForecast()
weatherForecastFlow.collectIndexed { index, value ->
// Check whether first data is loading
if (index == 0) assert(value == Result.Loading)
// Check whether second data is Success
if (index == 1) assert(value is Result.Success)
}
}
}
Alternatively, you can convert the Flow to a list with the toList operator and test the list.
@Test
fun fetchWeatherForecast_ShouldReturnLoadingThenResult_toList() {
mainCoroutineRule.testDispatcher.runBlockingTest {
// List to keep weather forecast values
val weatherForecastList = mutableListOf<Result<Int>>()
// Make api call
val weatherForecastFlow = weatherForecastRepository.fetchWeatherForecast()
// Convert flow to a list
weatherForecastFlow.toList(weatherForecastList)
// Check whether first data is loading
assert(weatherForecastList.first() == Result.Loading)
// Check whether second (last) data is Success
assert(weatherForecastList.last() is Result.Success)
}
}
Controlling and testing data flows becomes very easy with the Flow operator.
Note: You can convert LiveData to Flow by using asFlow extension and testing Flow instead of LiveData. The created Flow will only contain the latest value and then observe the updates from LiveData. Since fetching response & updating LiveData happens in our ViewModel initialization, it is not suitable for our test case.
If you are testing LiveData & Coroutines, make sure to inject all dispatchers and use TestCoroutineDispatcher . Also, don't forget to use runBlockingTest to bypass delays in your tests. If you are starting a new coroutine inside a test function, use the pause and resume functions in the test dispatcher to properly test LiveData values. Additionally, you can choose to use the asFlow extension and test Flow instead of LiveData.
Source
https://proandroiddev.com/using-livedata-flow-in-mvvm-part-iii-8703d305ca73