Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ internal class ChannelStateImpl(
private val _quotedMessagesMap = MutableStateFlow<Map<String, Set<String>>>(emptyMap())
private val _messages = MutableStateFlow<List<Message>>(emptyList())
private val localOnlyMessages = MutableStateFlow<List<Message>>(emptyList())

/**
* Tracks the creation date of the current user's most recent thread-only reply. Thread-only
* replies are excluded from [_messages], so the cooldown derivation tracks them here separately.
*/
private val _lastSentThreadReplyDate = MutableStateFlow<Date?>(null)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
private val _pendingEnabled = MutableStateFlow(false)
private val _pendingMessages = MutableStateFlow<List<Message>>(emptyList())

Expand Down Expand Up @@ -262,11 +268,15 @@ internal class ChannelStateImpl(

override val insideSearch: StateFlow<Boolean> = _insideSearch.asStateFlow()

override val lastSentMessageDate: StateFlow<Date?> = combineStates(channelConfig, messages) { config, messages ->
messages
.filter { it.user.id == currentUser.value?.id }
.lastMessageAt(config.skipLastMsgUpdateForSystemMsgs)
}
private val lastSentChannelMessageDate: StateFlow<Date?> =
combineStates(channelConfig, messages) { config, messages ->
messages
.filter { it.user.id == currentUser.value?.id }
.lastMessageAt(config.skipLastMsgUpdateForSystemMsgs)
}

override val lastSentMessageDate: StateFlow<Date?> =
combineStates(lastSentChannelMessageDate, _lastSentThreadReplyDate, ::latestOf)

override val activeLiveLocations: StateFlow<List<Location>> = liveLocations.mapState { locations ->
// Filter locations to only include those for this channel
Expand Down Expand Up @@ -305,12 +315,28 @@ internal class ChannelStateImpl(

// region Messages

/** Advances [_lastSentThreadReplyDate] with [date], never moving it backwards. */
private fun advanceLastSentThreadReplyDate(date: Date?) {
date ?: return
_lastSentThreadReplyDate.update { current -> latestOf(current, date) }
}

private fun trackOwnThreadReply(message: Message) {
val currentUserId = currentUser.value?.id ?: return
advanceLastSentThreadReplyDate(message.ownThreadReplyDate(currentUserId))
}

private fun trackOwnThreadReply(messages: Collection<Message>) {
advanceLastSentThreadReplyDate(messages.latestOwnThreadReplyDate(currentUser.value?.id))
}

/**
* Sets the list of messages (overriding the current one).
*
* @param messages The list of messages to set.
*/
fun setMessages(messages: List<Message>) {
trackOwnThreadReply(messages)
val messagesToSet = messages.filterNot { shouldIgnoreUpsertion(it) }
for (message in messagesToSet) {
message.replyTo?.let { addQuotedMessage(it.id, message.id) }
Expand All @@ -329,6 +355,7 @@ internal class ChannelStateImpl(
* @param message The message to upsert.
*/
fun upsertMessage(message: Message) {
trackOwnThreadReply(message)
if (shouldIgnoreUpsertion(message)) return
message.replyTo?.let { addQuotedMessage(it.id, message.id) }
message.replyMessageId?.let { addQuotedMessage(it, message.id) }
Expand Down Expand Up @@ -367,6 +394,7 @@ internal class ChannelStateImpl(
* for pagination performance; set to `true` for reconnection/sync paths where messages may overlap.
*/
fun upsertMessages(messages: List<Message>, preserveAttachmentUrls: Boolean = false) {
trackOwnThreadReply(messages)
val messagesToUpsert = messages.filterNot { shouldIgnoreUpsertion(it) }
if (messagesToUpsert.isEmpty()) return
for (message in messagesToUpsert) {
Expand Down Expand Up @@ -1500,6 +1528,7 @@ internal class ChannelStateImpl(
_repliedMessage.value = null
_quotedMessagesMap.value = emptyMap()
_messages.value = emptyList()
_lastSentThreadReplyDate.value = null
_pendingMessages.value = emptyList()
_pendingEnabled.value = false
_cachedLatestMessages.value = emptyList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import io.getstream.chat.android.models.User
import io.getstream.log.taggedLogger
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.update
import java.util.Date
import java.util.concurrent.atomic.AtomicInteger

Expand Down Expand Up @@ -79,6 +80,13 @@ internal class ChannelStateLegacyImpl(
private var messageLimit: Int? = baseMessageLimit

private var _messages: MutableStateFlow<Map<String, Message>>? = MutableStateFlow(emptyMap())

/**
* Tracks the creation date of the current user's most recent thread-only reply. Thread-only
* replies are excluded from the visible [messages] list, so the cooldown derivation tracks them
* here separately.
*/
private val _lastSentThreadReplyDate = MutableStateFlow<Date?>(null)
private var _pinnedMessages: MutableStateFlow<Map<String, Message>>? = MutableStateFlow(emptyMap())
private var _typing: MutableStateFlow<TypingEvent>? = MutableStateFlow(TypingEvent(channelId, emptyList()))
private var _rawReads: MutableStateFlow<Map<String, ChannelUserRead>>? = MutableStateFlow(emptyMap())
Expand Down Expand Up @@ -252,7 +260,7 @@ internal class ChannelStateLegacyImpl(

override val insideSearch: StateFlow<Boolean> = _insideSearch!!

override val lastSentMessageDate: StateFlow<Date?> = combineStates(
private val lastSentChannelMessageDate: StateFlow<Date?> = combineStates(
userFlow,
channelConfig,
messages,
Expand All @@ -264,6 +272,9 @@ internal class ChannelStateLegacyImpl(
}
}

override val lastSentMessageDate: StateFlow<Date?> =
combineStates(lastSentChannelMessageDate, _lastSentThreadReplyDate, ::latestOf)

override fun toChannel(): Channel {
// recreate a channel object from the various observables.
return channelData.value
Expand Down Expand Up @@ -609,7 +620,18 @@ internal class ChannelStateLegacyImpl(
setPinned { pinned -> pinned.filter { it.value.wasCreatedAfter(date) } }
}

/** Advances [_lastSentThreadReplyDate] with [date], never moving it backwards. */
private fun advanceLastSentThreadReplyDate(date: Date?) {
date ?: return
_lastSentThreadReplyDate.update { current -> latestOf(current, date) }
}

private fun trackOwnThreadReply(messages: Collection<Message>) {
advanceLastSentThreadReplyDate(messages.latestOwnThreadReplyDate(userFlow.value?.id))
}

fun upsertMessages(updatedMessages: Collection<Message>) {
trackOwnThreadReply(updatedMessages)
_messages?.apply {
val newMessageList = (value + (updatedMessages.associateBy(Message::id) - deletedMessagesIds)).values
value = applyMessageLimitIfNeeded(newMessageList).associateBy(Message::id)
Expand All @@ -624,6 +646,7 @@ internal class ChannelStateLegacyImpl(
}

fun setMessages(messages: List<Message>) {
trackOwnThreadReply(messages)
_messages?.value = applyMessageLimitIfNeeded(messages).associateBy(Message::id)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (c) 2014-2026 Stream.io Inc. All rights reserved.
*
* Licensed under the Stream License;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://github.com/GetStream/stream-chat-android/blob/main/LICENSE
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.getstream.chat.android.client.internal.state.plugin.state.channel.internal

import io.getstream.chat.android.models.Message
import java.util.Date

/**
* Returns the more recent of two nullable dates, or `null` when both are `null`.
*/
internal fun latestOf(first: Date?, second: Date?): Date? = when {
first == null -> second
second == null -> first
else -> maxOf(first, second)
}

/**
* Returns the creation date of this message when it is a thread-only reply sent by [currentUserId],
* or `null` otherwise.
*
* The channel cooldown counts thread replies the same as channel messages, but thread-only replies
* (`parentId != null && !showInChannel`) are excluded from the channel message list, so the cooldown
* derivation tracks them separately. Shadowed messages are excluded to match the channel message
* date derivation.
*/
internal fun Message.ownThreadReplyDate(currentUserId: String): Date? = when {
user.id != currentUserId -> null
parentId == null || showInChannel -> null
shadowed -> null
else -> createdLocallyAt ?: createdAt
}

/**
* Returns the creation date of the most recent thread-only reply sent by [currentUserId] in this
* collection, or `null` when there is none.
*/
internal fun Collection<Message>.latestOwnThreadReplyDate(currentUserId: String?): Date? {
currentUserId ?: return null
return asSequence()
.mapNotNull { it.ownThreadReplyDate(currentUserId) }
.maxOrNull()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright (c) 2014-2026 Stream.io Inc. All rights reserved.
*
* Licensed under the Stream License;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://github.com/GetStream/stream-chat-android/blob/main/LICENSE
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.getstream.chat.android.client.internal.state.plugin.state.channel.internal

import io.getstream.chat.android.randomUser
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertNotNull
import org.junit.jupiter.api.Assertions.assertNull
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test

internal class ChannelStateImplLastSentMessageDateTest : ChannelStateImplTestBase() {

@Test
fun `a thread-only reply by the current user updates lastSentMessageDate`() = runTest {
val threadReply = createMessage(1, parentId = "parent1", showInChannel = false)

channelState.upsertMessage(threadReply)

// the reply is excluded from the visible message list, but still drives the cooldown
assertTrue(channelState.messages.value.isEmpty())
assertEquals(threadReply.createdAt, channelState.lastSentMessageDate.value)
}

@Test
fun `a thread-only reply via upsertMessages updates lastSentMessageDate`() = runTest {
val threadReply = createMessage(1, parentId = "parent1", showInChannel = false)

channelState.upsertMessages(listOf(threadReply))

assertEquals(threadReply.createdAt, channelState.lastSentMessageDate.value)
}

@Test
fun `a thread-only reply from another user does not update lastSentMessageDate`() = runTest {
val otherUserReply = createMessage(1, user = randomUser(), parentId = "parent1", showInChannel = false)

channelState.upsertMessage(otherUserReply)

assertNull(channelState.lastSentMessageDate.value)
}

@Test
fun `a channel message by the current user still updates lastSentMessageDate`() = runTest {
val channelMessage = createMessage(1)

channelState.upsertMessage(channelMessage)

assertEquals(channelMessage.createdAt, channelState.lastSentMessageDate.value)
}

@Test
fun `lastSentMessageDate is the later of the channel message and the thread reply`() = runTest {
val olderThreadReply = createMessage(1, parentId = "parent1", showInChannel = false)
val newerChannelMessage = createMessage(5)

channelState.upsertMessage(olderThreadReply)
channelState.upsertMessage(newerChannelMessage)

assertEquals(newerChannelMessage.createdAt, channelState.lastSentMessageDate.value)
}

@Test
fun `a newer thread reply wins over an older channel message`() = runTest {
val olderChannelMessage = createMessage(1)
val newerThreadReply = createMessage(5, parentId = "parent1", showInChannel = false)

channelState.upsertMessage(olderChannelMessage)
channelState.upsertMessage(newerThreadReply)

assertEquals(newerThreadReply.createdAt, channelState.lastSentMessageDate.value)
}

@Test
fun `the thread reply date survives a later refresh that omits it`() = runTest {
val threadReply = createMessage(5, parentId = "parent1", showInChannel = false)
channelState.upsertMessage(threadReply)

// a server refresh replaces the message list with channel messages only (no thread replies)
channelState.setMessages(createMessages(count = 2, startIndex = 1))

assertEquals(threadReply.createdAt, channelState.lastSentMessageDate.value)
}

@Test
fun `destroy clears the thread-reply contribution to lastSentMessageDate`() = runTest {
channelState.upsertMessage(createMessage(1, parentId = "parent1", showInChannel = false))
assertNotNull(channelState.lastSentMessageDate.value)

channelState.destroy()

assertNull(channelState.lastSentMessageDate.value)
}
}
Loading
Loading