Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 33 additions & 16 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ type Config struct {
ClientCAs []string `json:"ClientCAs" yaml:"ClientCAs"`
Port uint16 `json:"Port" yaml:"Port"`
ServerID string `json:"ServerId" yaml:"ServerId"`
JoinAddr string `json:"JoinAddr" yaml:"JoinAddr"`
JoinAddr []string `json:"JoinAddr" yaml:"JoinAddr"`
AdvertiseAddr string `json:"AdvertiseAddr" yaml:"AdvertiseAddr"`
BindAddr string `json:"BindAddr" yaml:"BindAddr"`
DataDir string `json:"DataDir" yaml:"DataDir"`
BootstrapCluster bool `json:"BootstrapCluster" yaml:"BootstrapCluster"`
Expand All @@ -61,8 +62,10 @@ type Config struct {
CommitTimeout time.Duration `json:"CommitTimeout" yaml:"CommitTimeout"`
Modules []string `json:"Plugins" yaml:"Plugins"`
DiscoveryPort uint16 `json:"DiscoveryPort" yaml:"DiscoveryPort"`
RaftBindAddr string
RaftBindPort uint16
RaftBindAddr string `json:"RaftBindAddr" yaml:"RaftBindAddr"`
RaftBindPort uint16 `json:"RaftBindPort" yaml:"RaftBindPort"`
RaftAdvertiseAddr string `json:"RaftAdvertiseAddr" yaml:"RaftAdvertiseAddr"`
RaftAdvertisePort uint16 `json:"RaftAdvertisePort" yaml:"RaftAdvertisePort"`
}

func GetConfig() (Config, error) {
Expand Down Expand Up @@ -148,12 +151,32 @@ There is no limit by default.`, func(memory string) error {
return nil
})

var joinAddrs []string
flag.Func("join-addr", "Address of cluster member to join. Can be specified multiple times.", func(s string) error {
joinAddrs = append(joinAddrs, s)
return nil
})

internalRaftAddress, e := internal.GetIPAddress()
if e != nil {
return Config{}, e
}

internalFreePort, e := internal.GetFreePort()
if e != nil {
return Config{}, e
}

tls := flag.Bool("tls", false, "Start the echovault in TLS mode. Default is false.")
mtls := flag.Bool("mtls", false, "Use mTLS to verify the client.")
port := flag.Int("port", 7480, "Port to use. Default is 7480")
serverId := flag.String("server-id", "1", "SugarDB ID in raft cluster. Leave empty for client.")
joinAddr := flag.String("join-addr", "", "Address of cluster member in a cluster to you want to join.")
advertiseAddr := flag.String("advertise-addr", "127.0.0.1", "Address to bind the echovault to.")
bindAddr := flag.String("bind-addr", "127.0.0.1", "Address to bind the echovault to.")
raftBindAddr := flag.String("raft-bind-addr", internalRaftAddress, "Raft Address to bind to.")
raftBindPort := flag.Int("raft-bind-port", internalFreePort, "Raft Port to bind to.")
raftAdvertiseAddr := flag.String("raft-advertise-addr", internalRaftAddress, "Raft Address to advertise.")
raftAdvertisePort := flag.Int("raft-advertise-port", internalFreePort, "Raft Port to bind to advertise")
discoveryPort := flag.Uint("discovery-port", 7946, "Port to use for memberlist cluster discovery.")
dataDir := flag.String("data-dir", ".", "Directory to store snapshots and logs.")
bootstrapCluster := flag.Bool("bootstrap-cluster", false, "Whether this instance should bootstrap a new cluster.")
Expand Down Expand Up @@ -191,23 +214,15 @@ It is a plain text value by default but you can provide a SHA256 hash by adding

flag.Parse()

raftBindAddr, e := internal.GetIPAddress()
if e != nil {
return Config{}, e
}
raftBindPort, e := internal.GetFreePort()
if e != nil {
return Config{}, e
}

conf := Config{
CertKeyPairs: certKeyPairs,
ClientCAs: clientCAs,
TLS: *tls,
MTLS: *mtls,
Port: uint16(*port),
ServerID: *serverId,
JoinAddr: *joinAddr,
JoinAddr: joinAddrs,
AdvertiseAddr: *advertiseAddr,
BindAddr: *bindAddr,
DataDir: *dataDir,
BootstrapCluster: *bootstrapCluster,
Expand All @@ -229,8 +244,10 @@ It is a plain text value by default but you can provide a SHA256 hash by adding
CommitTimeout: *commitTimeout,
Modules: modules,
DiscoveryPort: uint16(*discoveryPort),
RaftBindAddr: raftBindAddr,
RaftBindPort: uint16(raftBindPort),
RaftBindAddr: *raftBindAddr,
RaftBindPort: uint16(*raftBindPort),
RaftAdvertiseAddr: *raftAdvertiseAddr,
RaftAdvertisePort: uint16(*raftAdvertisePort),
}

if len(*config) > 0 {
Expand Down
5 changes: 4 additions & 1 deletion internal/config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ func DefaultConfig() Config {
ClientCAs: make([]string, 0),
Port: 7480,
ServerID: "",
JoinAddr: "",
JoinAddr: make([]string, 0),
AdvertiseAddr: "localhost",
BindAddr: "localhost",
RaftBindAddr: raftBindAddr,
RaftBindPort: uint16(raftBindPort),
RaftAdvertiseAddr: raftBindAddr,
RaftAdvertisePort: uint16(raftBindPort),
DiscoveryPort: 7946,
DataDir: ".",
BootstrapCluster: false,
Expand Down
9 changes: 5 additions & 4 deletions internal/memberlist/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ import (
"context"
"encoding/json"
"fmt"
"log"
"time"

"github.com/echovault/sugardb/internal"
"github.com/echovault/sugardb/internal/config"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/raft"
"log"
"time"
)

type Delegate struct {
Expand All @@ -50,8 +51,8 @@ func (delegate *Delegate) NodeMeta(limit int) []byte {
meta := NodeMeta{
ServerID: raft.ServerID(delegate.options.config.ServerID),
RaftAddr: raft.ServerAddress(
fmt.Sprintf("%s:%d", delegate.options.config.RaftBindAddr, delegate.options.config.RaftBindPort)),
MemberlistAddr: fmt.Sprintf("%s:%d", delegate.options.config.BindAddr, delegate.options.config.DiscoveryPort),
fmt.Sprintf("%s:%d", delegate.options.config.RaftAdvertiseAddr, delegate.options.config.RaftAdvertisePort)),
MemberlistAddr: fmt.Sprintf("%s:%d", delegate.options.config.AdvertiseAddr, delegate.options.config.DiscoveryPort),
}

b, err := json.Marshal(&meta)
Expand Down
33 changes: 21 additions & 12 deletions internal/memberlist/event_delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
package memberlist

import (
"encoding/json"
"github.com/hashicorp/memberlist"
"log"

"github.com/hashicorp/memberlist"
)

type EventDelegate struct {
Expand All @@ -42,23 +42,32 @@ func (eventDelegate *EventDelegate) NotifyJoin(node *memberlist.Node) {
}

// NotifyLeave implements EventDelegate interface
// Note: This is called for both graceful leaves and node failures.
// We do NOT remove the node from Raft here to allow nodes to rejoin after maintenance.
// Raft will handle unreachable nodes through its own timeout mechanisms.
func (eventDelegate *EventDelegate) NotifyLeave(node *memberlist.Node) {
eventDelegate.options.decrementNodes()

var meta NodeMeta
// Commented out automatic Raft removal to support maintenance scenarios
// If you need to permanently remove a node, use a manual admin command
/*
var meta NodeMeta

err := json.Unmarshal(node.Meta, &meta)
err := json.Unmarshal(node.Meta, &meta)

if err != nil {
log.Println("Could not get leaving node's metadata.")
return
}
if err != nil {
log.Println("Could not get leaving node's metadata.")
return
}

err = eventDelegate.options.removeRaftServer(meta)
err = eventDelegate.options.removeRaftServer(meta)

if err != nil {
log.Println(err)
}
if err != nil {
log.Println(err)
}
*/

log.Printf("Node %s left the memberlist (but remains in Raft for potential rejoin)\n", node.Name)
}

// NotifyUpdate implements EventDelegate interface
Expand Down
17 changes: 10 additions & 7 deletions internal/memberlist/memberlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ import (
"context"
"crypto/md5"
"fmt"
"github.com/echovault/sugardb/internal"
"github.com/echovault/sugardb/internal/config"
"log"
"sync"
"time"

"github.com/echovault/sugardb/internal"
"github.com/echovault/sugardb/internal/config"

"github.com/hashicorp/memberlist"
"github.com/hashicorp/raft"
"github.com/sethvargo/go-retry"
Expand Down Expand Up @@ -68,6 +69,8 @@ func (m *MemberList) MemberListInit(ctx context.Context) {
cfg.Name = m.options.Config.ServerID
cfg.BindAddr = m.options.Config.BindAddr
cfg.BindPort = int(m.options.Config.DiscoveryPort)
cfg.AdvertiseAddr = m.options.Config.AdvertiseAddr
cfg.AdvertisePort = int(m.options.Config.DiscoveryPort)
cfg.Delegate = NewDelegate(DelegateOpts{
config: m.options.Config,
broadcastQueue: m.broadcastQueue,
Expand Down Expand Up @@ -105,11 +108,11 @@ func (m *MemberList) MemberListInit(ctx context.Context) {
log.Fatal(err)
}

if m.options.Config.JoinAddr != "" {
if len(m.options.Config.JoinAddr) > 0 {
backoffPolicy := internal.RetryBackoff(retry.NewFibonacci(1*time.Second), 5, 200*time.Millisecond, 0, 0)

err = retry.Do(ctx, backoffPolicy, func(ctx context.Context) error {
_, err = list.Join([]string{m.options.Config.JoinAddr})
_, err = list.Join(m.options.Config.JoinAddr)
if err != nil {
return retry.RetryableError(err)
}
Expand All @@ -130,7 +133,7 @@ func (m *MemberList) broadcastRaftAddress() {
NodeMeta: NodeMeta{
ServerID: raft.ServerID(m.options.Config.ServerID),
RaftAddr: raft.ServerAddress(fmt.Sprintf("%s:%d",
m.options.Config.RaftBindAddr, m.options.Config.RaftBindPort)),
m.options.Config.RaftAdvertiseAddr, m.options.Config.RaftAdvertisePort)),
},
}
m.broadcastQueue.QueueBroadcast(&msg)
Expand All @@ -148,7 +151,7 @@ func (m *MemberList) ForwardDeleteKey(ctx context.Context, key string) {
NodeMeta: NodeMeta{
ServerID: raft.ServerID(m.options.Config.ServerID),
RaftAddr: raft.ServerAddress(fmt.Sprintf("%s:%d",
m.options.Config.BindAddr, m.options.Config.RaftBindPort)),
m.options.Config.RaftAdvertiseAddr, m.options.Config.RaftAdvertisePort)),
},
})
}
Expand All @@ -165,7 +168,7 @@ func (m *MemberList) ForwardDataMutation(ctx context.Context, cmd []byte) {
NodeMeta: NodeMeta{
ServerID: raft.ServerID(m.options.Config.ServerID),
RaftAddr: raft.ServerAddress(fmt.Sprintf("%s:%d",
m.options.Config.BindAddr, m.options.Config.RaftBindPort)),
m.options.Config.RaftAdvertiseAddr, m.options.Config.RaftAdvertisePort)),
},
})
}
Expand Down
5 changes: 3 additions & 2 deletions internal/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (r *Raft) RaftInit(ctx context.Context) {
}

bindAddr := fmt.Sprintf("%s:%d", conf.RaftBindAddr, conf.RaftBindPort)
advertiseAddr, err := net.ResolveTCPAddr("tcp", bindAddr)
advertiseAddr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", conf.RaftAdvertiseAddr, conf.RaftAdvertisePort))
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -140,12 +140,13 @@ func (r *Raft) RaftInit(ctx context.Context) {

if conf.BootstrapCluster {
// Error can be safely ignored if we're already leader
address := raft.ServerAddress(fmt.Sprintf("%s:%d", conf.RaftAdvertiseAddr, conf.RaftAdvertisePort))
_ = raftServer.BootstrapCluster(raft.Configuration{
Servers: []raft.Server{
{
Suffrage: raft.Voter,
ID: raft.ServerID(conf.ServerID),
Address: raft.ServerAddress(conf.RaftBindAddr),
Address: address,
},
},
}).Error()
Expand Down
2 changes: 1 addition & 1 deletion sugardb/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
)

func (server *SugarDB) isInCluster() bool {
return server.config.BootstrapCluster || server.config.JoinAddr != ""
return server.config.BootstrapCluster || len(server.config.JoinAddr) > 0
}

func (server *SugarDB) raftApplyDeleteKey(ctx context.Context, key string) error {
Expand Down
35 changes: 31 additions & 4 deletions sugardb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,21 @@ func WithServerID(serverID string) func(sugardb *SugarDB) {
}
}

// WithJoinAddr is an option to the NewSugarDB function that allows you to pass a
// custom JoinAddr to SugarDB.
// WithJoinAddr is an option to the NewSugarDB function that allows you to pass
// custom JoinAddr addresses to SugarDB.
// If not specified, SugarDB will use the default configuration from config.DefaultConfig().
func WithJoinAddr(joinAddr string) func(sugardb *SugarDB) {
func WithJoinAddr(joinAddrs ...string) func(sugardb *SugarDB) {
return func(sugardb *SugarDB) {
sugardb.config.JoinAddr = joinAddr
sugardb.config.JoinAddr = joinAddrs
}
}

// WithAdvertiseAddr is an option to the NewSugarDB function that allows you to pass a
// custom AdvertiseAddr to SugarDB.
// If not specified, SugarDB will use the default configuration from config.DefaultConfig().
func WithAdvertiseAddr(advertiseAddr string) func(sugardb *SugarDB) {
return func(sugardb *SugarDB) {
sugardb.config.AdvertiseAddr = advertiseAddr
}
}

Expand Down Expand Up @@ -378,3 +387,21 @@ func WithRaftBindPort(raftBindPort uint16) func(sugardb *SugarDB) {
sugardb.config.RaftBindPort = raftBindPort
}
}

// WithRaftAdvertiseAddr is an option to the NewSugarDB function that allows you to pass a
// custom RaftAdvertiseAddr to SugarDB.
// If not specified, SugarDB will use the default configuration from config.DefaultConfig().
func WithRaftAdvertiseAddr(raftAdvertiseAddr string) func(sugardb *SugarDB) {
return func(sugardb *SugarDB) {
sugardb.config.RaftAdvertiseAddr = raftAdvertiseAddr
}
}

// WithRaftAdvertisePort is an option to the NewSugarDB function that allows you to pass a
// custom RaftAdvertisePort to SugarDB.
// If not specified, SugarDB will use the default configuration from config.DefaultConfig().
func WithRaftAdvertisePort(raftAdvertisePort uint16) func(sugardb *SugarDB) {
return func(sugardb *SugarDB) {
sugardb.config.RaftAdvertisePort = raftAdvertisePort
}
}
4 changes: 3 additions & 1 deletion sugardb/sugardb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ func setupServer(
conf.DataDir = dataDir
conf.ForwardCommand = forwardCommand
conf.BindAddr = bindAddr
conf.JoinAddr = joinAddr
if joinAddr != "" {
conf.JoinAddr = []string{joinAddr}
}
conf.Port = uint16(port)
conf.ServerID = serverId
conf.DiscoveryPort = uint16(discoveryPort)
Expand Down