Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func NewAdmin(opts ...AdminOption) (*admin, error) {

cli := internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)
if cli == nil {
return nil, fmt.Errorf("GetOrNewRocketMQClient faild")
return nil, fmt.Errorf("GetOrNewRocketMQClient failed")
}
defaultOpts.Namesrv = cli.GetNameSrv()
//log.Printf("Client: %#v", namesrv.srvs)
Expand Down
2 changes: 1 addition & 1 deletion consumer/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func newTraceInterceptor(dispatcher internal.TraceDispatcher) primitive.Intercep

return func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error {
if dispatcher == nil {
return fmt.Errorf("GetOrNewRocketMQClient faild")
return fmt.Errorf("GetOrNewRocketMQClient failed")
}
consumerCtx, exist := primitive.GetConsumerCtx(ctx)
if !exist || len(consumerCtx.Msgs) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion consumer/pull_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) {
allocate: defaultOpts.Strategy,
}
if dc.client == nil {
return nil, fmt.Errorf("GetOrNewRocketMQClient faild")
return nil, fmt.Errorf("GetOrNewRocketMQClient failed")
}
defaultOpts.Namesrv = dc.client.GetNameSrv()

Expand Down
2 changes: 1 addition & 1 deletion consumer/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
option: defaultOpts,
}
if dc.client == nil {
return nil, fmt.Errorf("GetOrNewRocketMQClient faild")
return nil, fmt.Errorf("GetOrNewRocketMQClient failed")
}
defaultOpts.Namesrv = dc.client.GetNameSrv()

Expand Down
2 changes: 1 addition & 1 deletion examples/consumer/interceptor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func main() {
func UserFistInterceptor() primitive.Interceptor {
return func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error {
msgCtx, _ := primitive.GetConsumerCtx(ctx)
fmt.Printf("msgCtx: %v, mehtod: %s", msgCtx, primitive.GetMethod(ctx))
fmt.Printf("msgCtx: %v, method: %s", msgCtx, primitive.GetMethod(ctx))

msgs := req.([]*primitive.MessageExt)
fmt.Printf("user first interceptor before invoke: %v\n", msgs)
Expand Down
2 changes: 1 addition & 1 deletion examples/producer/rpc/async/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func main() {
fmt.Printf("request to <%s> fail, err:%v \n", topic, respErr)
return
}
fmt.Printf("Requst to %s cost:%d ms responseMsg:%s\n", topic, time.Since(now)/time.Millisecond, responseMsg.String())
fmt.Printf("Request to %s cost:%d ms responseMsg:%s\n", topic, time.Since(now)/time.Millisecond, responseMsg.String())
}
err = p.RequestAsync(context.Background(), ttl, f, msg)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion examples/producer/rpc/sync/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func main() {
fmt.Printf("Request message error: %s\n", err)
return
}
fmt.Printf("Requst to %s cost:%d ms responseMsg:%s\n", topic, time.Since(now)/time.Millisecond, responseMsg.String())
fmt.Printf("Request to %s cost:%d ms responseMsg:%s\n", topic, time.Since(now)/time.Millisecond, responseMsg.String())

err = p.Shutdown()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions examples/producer/transaction/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (dl *DemoListener) ExecuteLocalTransaction(msg *primitive.Message) primitiv
dl.localTrans.Store(msg.TransactionId, primitive.LocalTransactionState(status+1))

fmt.Printf("dl")
return primitive.UnkonwnState
return primitive.UnknownState
}

func (dl *DemoListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
Expand All @@ -69,7 +69,7 @@ func (dl *DemoListener) CheckLocalTransaction(msg *primitive.MessageExt) primiti
return primitive.RollbackMessageState
case 3:
fmt.Printf("checkLocalTransaction unknown: %v\n", msg)
return primitive.UnkonwnState
return primitive.UnknownState
default:
fmt.Printf("checkLocalTransaction default COMMIT_MESSAGE: %v\n", msg)
return primitive.CommitMessageState
Expand Down
8 changes: 4 additions & 4 deletions internal/remote/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,10 +298,10 @@ func TestCommandJsonEncodeDecode(t *testing.T) {
t.Errorf("wrong command version. want=%d, got=%d", cmd.Version, newCmd.Version)
}
if newCmd.Opaque != cmd.Opaque {
t.Errorf("wrong command version. want=%d, got=%d", cmd.Opaque, newCmd.Opaque)
t.Errorf("wrong command opaque. want=%d, got=%d", cmd.Opaque, newCmd.Opaque)
}
if newCmd.Flag != cmd.Flag {
t.Errorf("wrong commad flag. want=%d, got=%d", cmd.Flag, newCmd.Flag)
t.Errorf("wrong command flag. want=%d, got=%d", cmd.Flag, newCmd.Flag)
}
if newCmd.Remark != cmd.Remark {
t.Errorf("wrong command remakr. want=%s, got=%s", cmd.Remark, newCmd.Remark)
Expand Down Expand Up @@ -335,10 +335,10 @@ func TestCommandRocketMQEncodeDecode(t *testing.T) {
t.Errorf("wrong command version. want=%d, got=%d", cmd.Version, newCmd.Version)
}
if newCmd.Opaque != cmd.Opaque {
t.Errorf("wrong command version. want=%d, got=%d", cmd.Opaque, newCmd.Opaque)
t.Errorf("wrong command opaque. want=%d, got=%d", cmd.Opaque, newCmd.Opaque)
}
if newCmd.Flag != cmd.Flag {
t.Errorf("wrong commad flag. want=%d, got=%d", cmd.Flag, newCmd.Flag)
t.Errorf("wrong command flag. want=%d, got=%d", cmd.Flag, newCmd.Flag)
}
if newCmd.Remark != cmd.Remark {
t.Errorf("wrong command remakr. want=%s, got=%s", cmd.Remark, newCmd.Remark)
Expand Down
40 changes: 20 additions & 20 deletions internal/remote/remote_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestNewResponseFuture(t *testing.T) {
t.Errorf("wrong ResponseFuture's opaque. want=%d, got=%d", 10, future.Opaque)
}
if future.Err != nil {
t.Errorf("wrong RespnseFuture's Err. want=<nil>, got=%v", future.Err)
t.Errorf("wrong ResponseFuture's Err. want=<nil>, got=%v", future.Err)
}
if future.callback != nil {
t.Errorf("wrong ResponseFuture's callback. want=<nil>, got!=<nil>")
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestResponseFutureWaitResponse(t *testing.T) {
t.Errorf("wrong ResponseFuture waitResponse error: %v", err)
} else {
if r != responseRemotingCommand {
t.Errorf("wrong ResponseFuture waitResposne result. want=%v, got=%v",
t.Errorf("wrong ResponseFuture waitResponse result. want=%v, got=%v",
responseRemotingCommand, r)
}
}
Expand Down Expand Up @@ -145,9 +145,9 @@ func TestCreateScanner(t *testing.T) {
func TestInvokeSync(t *testing.T) {
addr := ":3004"

clientSendRemtingCommand := NewRemotingCommand(10, nil, []byte("Hello RocketMQ"))
clientSendRemotingCommand := NewRemotingCommand(10, nil, []byte("Hello RocketMQ"))
serverSendRemotingCommand := NewRemotingCommand(20, nil, []byte("Welcome native"))
serverSendRemotingCommand.Opaque = clientSendRemtingCommand.Opaque
serverSendRemotingCommand.Opaque = clientSendRemotingCommand.Opaque
serverSendRemotingCommand.Flag = ResponseType
var wg sync.WaitGroup
wg.Add(1)
Expand All @@ -159,7 +159,7 @@ func TestInvokeSync(t *testing.T) {
go func() {
clientSend.Wait()
receiveCommand, err := client.InvokeSync(context.Background(), addr,
clientSendRemtingCommand)
clientSendRemotingCommand)
if err != nil {
t.Fatalf("failed to invoke synchronous. %s", err)
} else {
Expand Down Expand Up @@ -190,19 +190,19 @@ func TestInvokeSync(t *testing.T) {
for scanner.Scan() {
receivedRemotingCommand, err := decode(scanner.Bytes())
if err != nil {
t.Errorf("failed to decode RemotingCommnad. %s", err)
t.Errorf("failed to decode RemotingCommand. %s", err)
}
if clientSendRemtingCommand.Code != receivedRemotingCommand.Code {
if clientSendRemotingCommand.Code != receivedRemotingCommand.Code {
t.Errorf("wrong code. want=%d, got=%d", receivedRemotingCommand.Code,
clientSendRemtingCommand.Code)
clientSendRemotingCommand.Code)
}
body, err := encode(serverSendRemotingCommand)
if err != nil {
t.Fatalf("failed to encode RemotingCommand")
}
_, err = conn.Write(body)
if err != nil {
t.Fatalf("failed to write body to conneciton.")
t.Fatalf("failed to write body to connection.")
}
goto done
}
Expand All @@ -220,10 +220,10 @@ func TestInvokeAsync(t *testing.T) {
for i := 0; i < cnt; i++ {
go func(index int) {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
t.Logf("[Send: %d] asychronous message", index)
t.Logf("[Send: %d] asynchronous message", index)
sendRemotingCommand := randomNewRemotingCommand()
err := client.InvokeAsync(context.Background(), addr, sendRemotingCommand, func(r *ResponseFuture) {
t.Logf("[Receive: %d] asychronous message response", index)
t.Logf("[Receive: %d] asynchronous message response", index)
if string(sendRemotingCommand.Body) != string(r.ResponseCommand.Body) {
t.Errorf("wrong response message. want=%s, got=%s", string(sendRemotingCommand.Body),
string(r.ResponseCommand.Body))
Expand Down Expand Up @@ -275,9 +275,9 @@ done:
func TestInvokeAsyncTimeout(t *testing.T) {
addr := ":3002"

clientSendRemtingCommand := NewRemotingCommand(10, nil, []byte("Hello RocketMQ"))
clientSendRemotingCommand := NewRemotingCommand(10, nil, []byte("Hello RocketMQ"))
serverSendRemotingCommand := NewRemotingCommand(20, nil, []byte("Welcome native"))
serverSendRemotingCommand.Opaque = clientSendRemtingCommand.Opaque
serverSendRemotingCommand.Opaque = clientSendRemotingCommand.Opaque
serverSendRemotingCommand.Flag = ResponseType

var wg sync.WaitGroup
Expand All @@ -290,7 +290,7 @@ func TestInvokeAsyncTimeout(t *testing.T) {
clientSend.Wait()
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(10*time.Second))
defer cancel()
err := client.InvokeAsync(ctx, addr, clientSendRemtingCommand,
err := client.InvokeAsync(ctx, addr, clientSendRemotingCommand,
func(r *ResponseFuture) {
assert.NotNil(t, r.Err)
assert.Equal(t, errors.ErrRequestTimeout, r.Err)
Expand All @@ -313,7 +313,7 @@ func TestInvokeAsyncTimeout(t *testing.T) {
for scanner.Scan() {
t.Logf("receive request.")
_, err := decode(scanner.Bytes())
assert.Nil(t, err, "failed to decode RemotingCommnad.")
assert.Nil(t, err, "failed to decode RemotingCommand.")

time.Sleep(5 * time.Second) // force client timeout
goto done
Expand All @@ -325,7 +325,7 @@ done:

func TestInvokeOneWay(t *testing.T) {
addr := ":3008"
clientSendRemtingCommand := NewRemotingCommand(10, nil, []byte("Hello RocketMQ"))
clientSendRemotingCommand := NewRemotingCommand(10, nil, []byte("Hello RocketMQ"))

var wg sync.WaitGroup
wg.Add(1)
Expand All @@ -335,7 +335,7 @@ func TestInvokeOneWay(t *testing.T) {
clientSend.Add(1)
go func() {
clientSend.Wait()
err := client.InvokeOneWay(context.Background(), addr, clientSendRemtingCommand)
err := client.InvokeOneWay(context.Background(), addr, clientSendRemotingCommand)
if err != nil {
t.Fatalf("failed to invoke synchronous. %s", err)
}
Expand All @@ -358,11 +358,11 @@ func TestInvokeOneWay(t *testing.T) {
for scanner.Scan() {
receivedRemotingCommand, err := decode(scanner.Bytes())
if err != nil {
t.Errorf("failed to decode RemotingCommnad. %s", err)
t.Errorf("failed to decode RemotingCommand. %s", err)
}
if clientSendRemtingCommand.Code != receivedRemotingCommand.Code {
if clientSendRemotingCommand.Code != receivedRemotingCommand.Code {
t.Errorf("wrong code. want=%d, got=%d", receivedRemotingCommand.Code,
clientSendRemtingCommand.Code)
clientSendRemotingCommand.Code)
}
goto done
}
Expand Down
6 changes: 3 additions & 3 deletions primitive/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ const (
PropertyUniqueClientMessageIdKeyIndex = "UNIQ_KEY"
PropertyMaxReconsumeTimes = "MAX_RECONSUME_TIMES"
PropertyConsumeStartTime = "CONSUME_START_TIME"
PropertyTranscationPreparedQueueOffset = "TRAN_PREPARED_QUEUE_OFFSET"
PropertyTranscationCheckTimes = "TRANSACTION_CHECK_TIMES"
PropertyTransactionPreparedQueueOffset = "TRAN_PREPARED_QUEUE_OFFSET"
PropertyTransactionCheckTimes = "TRANSACTION_CHECK_TIMES"
PropertyCheckImmunityTimeInSeconds = "CHECK_IMMUNITY_TIME_IN_SECONDS"
PropertyShardingKey = "SHARDING_KEY"
PropertyTransactionID = "__transactionId__"
Expand Down Expand Up @@ -451,7 +451,7 @@ type LocalTransactionState int
const (
CommitMessageState LocalTransactionState = iota + 1
RollbackMessageState
UnkonwnState
UnknownState
)

type TransactionListener interface {
Expand Down
2 changes: 1 addition & 1 deletion producer/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func newTraceInterceptor(dispatcher internal.TraceDispatcher) primitive.Intercep

return func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error {
if dispatcher == nil {
return fmt.Errorf("GetOrNewRocketMQClient faild")
return fmt.Errorf("GetOrNewRocketMQClient failed")
}
beginT := time.Now()
producerCtx, ok := primitive.GetProducerCtx(ctx)
Expand Down
6 changes: 3 additions & 3 deletions producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func NewDefaultProducer(opts ...Option) (*defaultProducer, error) {
}
producer.client = internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, producer.callbackCh)
if producer.client == nil {
return nil, fmt.Errorf("GetOrNewRocketMQClient faild")
return nil, fmt.Errorf("GetOrNewRocketMQClient failed")
}
defaultOpts.Namesrv = producer.client.GetNameSrv()

Expand Down Expand Up @@ -714,7 +714,7 @@ func (tp *transactionProducer) SendMessageInTransaction(ctx context.Context, msg
if err != nil {
return nil, err
}
localTransactionState := primitive.UnkonwnState
localTransactionState := primitive.UnknownState
switch rsp.Status {
case primitive.SendOK:
if len(rsp.TransactionID) > 0 {
Expand Down Expand Up @@ -785,7 +785,7 @@ func (tp *transactionProducer) transactionState(state primitive.LocalTransaction
return primitive.TransactionCommitType
case primitive.RollbackMessageState:
return primitive.TransactionRollbackType
case primitive.UnkonwnState:
case primitive.UnknownState:
return primitive.TransactionNotType
default:
return primitive.TransactionNotType
Expand Down