From 6c64ae4ea366e0490c91a66736b78d4e2e1a33d5 Mon Sep 17 00:00:00 2001 From: wsforever Date: Mon, 11 Aug 2025 17:43:47 +0800 Subject: [PATCH 1/2] mq unlock when delete from process queue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit when delete mq from process queue , if not unlock , then the mq will lock failed and can't add into process queue when the new consumer allocate this mq until 60s lock timeout。 --- consumer/consumer.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/consumer/consumer.go b/consumer/consumer.go index 9e9bedb2..a0bdf4b6 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -436,6 +436,14 @@ func (dc *defaultConsumer) doBalance() { dc.option.PullInterval.Store(time.Duration(float64(time.Second) / pullTimesPerSecond)) } + oldMqSet := make([]primitive.MessageQueue, 0) + if dc.consumeOrderly { + dc.processQueueTable.Range(func(key, value interface{}) bool { + oldMqSet = append(oldMqSet, key.(primitive.MessageQueue)) + return true + }) + } + changed := dc.updateProcessQueueTable(topic, allocateResult) if changed { dc.mqChanged(topic, mqAll, allocateResult) @@ -449,6 +457,21 @@ func (dc *defaultConsumer) doBalance() { "rebalanceResultSet": allocateResult, }) } + + if dc.consumeOrderly { + for _, mq := range oldMqSet { + match := false + for _, newMq := range allocateResult { + if mq.String() == newMq.String() { + match = true + break + } + } + if !match { + dc.unlock(&mq, true) + } + } + } } return true }) From c38b9f26fc6a67acb5fc838d0b57e138a16f0066 Mon Sep 17 00:00:00 2001 From: wsforever Date: Fri, 5 Sep 2025 15:57:15 +0800 Subject: [PATCH 2/2] unlock unused queue when orderly consumer rebalance unlock unused queue when orderly consumer rebalance --- consumer/consumer.go | 42 +++++++++++++++++++----------------------- 1 file changed, 19 insertions(+), 23 deletions(-) diff --git a/consumer/consumer.go b/consumer/consumer.go index a0bdf4b6..7d22b797 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -436,14 +436,6 @@ func (dc *defaultConsumer) doBalance() { dc.option.PullInterval.Store(time.Duration(float64(time.Second) / pullTimesPerSecond)) } - oldMqSet := make([]primitive.MessageQueue, 0) - if dc.consumeOrderly { - dc.processQueueTable.Range(func(key, value interface{}) bool { - oldMqSet = append(oldMqSet, key.(primitive.MessageQueue)) - return true - }) - } - changed := dc.updateProcessQueueTable(topic, allocateResult) if changed { dc.mqChanged(topic, mqAll, allocateResult) @@ -456,21 +448,6 @@ func (dc *defaultConsumer) doBalance() { "rebalanceResultSize": len(allocateResult), "rebalanceResultSet": allocateResult, }) - } - - if dc.consumeOrderly { - for _, mq := range oldMqSet { - match := false - for _, newMq := range allocateResult { - if mq.String() == newMq.String() { - match = true - break - } - } - if !match { - dc.unlock(&mq, true) - } - } } } return true @@ -732,10 +709,12 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*primitiv mq := key.(primitive.MessageQueue) pq := value.(*processQueue) if mq.Topic == topic { + unlockMqs := make([]*primitive.MessageQueue, 0, 1) if !mqSet[mq] { pq.WithDropped(true) if dc.removeUnnecessaryMessageQueue(&mq, pq) { dc.processQueueTable.Delete(key) + unlockMqs = append(unlockMqs, &mq) changed = true rlog.Info("remove unnecessary mq when updateProcessQueueTable", map[string]interface{}{ rlog.LogKeyConsumerGroup: dc.consumerGroup, @@ -746,6 +725,7 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*primitiv pq.WithDropped(true) if dc.removeUnnecessaryMessageQueue(&mq, pq) { dc.processQueueTable.Delete(key) + unlockMqs = append(unlockMqs, &mq) changed = true rlog.Warning("remove unnecessary mq because pull was expired, prepare to fix it", map[string]interface{}{ rlog.LogKeyConsumerGroup: dc.consumerGroup, @@ -753,6 +733,22 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*primitiv }) } } + + if dc.consumeOrderly && len(unlockMqs) > 0 { + // 释放掉不再订阅的mq的锁,不再订阅的mq已经在上面被删除了 + go func() { + brokerResult := dc.client.GetNameSrv().FindBrokerAddressInSubscribe(mq.BrokerName, internal.MasterId, true) + + if brokerResult != nil { + body := &lockBatchRequestBody{ + ConsumerGroup: dc.consumerGroup, + ClientId: dc.client.ClientID(), + MQs: unlockMqs, + } + dc.doUnlock(brokerResult.BrokerAddr, body, false) + } + }() + } } return true })