Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 160 additions & 24 deletions solrmonitor/solrmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,15 @@ type SolrMonitor struct {
solrRoot string // e.g. "/solr"
zkWatcher *ZkWatcherMan

mu sync.RWMutex
collections map[string]*collection // map of all currently-known collections
liveNodes []string // current set of live_nodes
queryNodes []string // current set of live_query_nodes
overseerNodes []string // current set of overseer nodes (from roles.json)
clusterProps map[string]string // current set of cluster props (from clusterprops.json)
solrEventListener SolrEventListener // to listen the solr cluster state
pathsToWatch map[string]struct{} // set of paths to always watch
mu sync.RWMutex
collections map[string]*collection // map of all currently-known collections
liveNodes []string // current set of live_nodes
queryNodes []string // current set of live_query_nodes
overseerNodes []string // current set of overseer nodes (from roles.json)
clusterProps map[string]string // current set of cluster props (from clusterprops.json)
solrEventListener SolrEventListener // to listen the solr cluster state
pathsToWatch map[string]struct{} // set of paths to always watch
usePersistentWatch bool
}

// Minimal interface solrmonitor needs (allows for mock ZK implementations).
Expand Down Expand Up @@ -110,7 +111,7 @@ func (c callbacks) ChildrenChanged(path string, children []string) error {
}

func (c callbacks) DataChanged(path string, data string, stat *zk.Stat) error {
if c.isPrsPath(path) { //special handling for PRS entry, since we are not fetching data on them
if c.usePersistentWatch && c.isPrsPath(path) { //special handling for PRS entry, since we are not fetching data on them
if stat.Version > -1 { //ignore deletion
return c.SolrMonitor.updateCollectionWithPrsPath(path)
}
Expand Down Expand Up @@ -229,13 +230,22 @@ func (c *SolrMonitor) childrenChanged(path string, children []string) error {
case c.solrRoot + liveQueryNodesPath:
return c.updateLiveQueryNodes(children)
default:
//since we use watch on /collections/<coll> and recursive watch on /collections/<coll>/state.json,
//since we use persistent watch on /collections/<coll> and recursive watch on /collections/<coll>/state.json,
//the ONLY change should be on init (for PRS entries)
if strings.Contains(path, c.solrRoot+"/collections/") {
return c.initCollectionWithPrsStrings(c.getCollFromPath(path), children)
//return fmt.Errorf("solrmonitor: unexpected childrenChanged on collections: %s", path)
if c.usePersistentWatch {
if strings.Contains(path, c.solrRoot+"/collections/") {
return c.initCollectionWithPrsStrings(c.getCollFromPath(path), children)
//return fmt.Errorf("solrmonitor: unexpected childrenChanged on collections: %s", path)
}
return nil
} else {
//collectionsPath + "/" + coll.name + "/state.json": we want to state.json children
if !strings.HasPrefix(path, c.solrRoot+"/collections/") || !strings.HasSuffix(path, "/state.json") {
return fmt.Errorf("solrmonitor: unknown childrenChanged: %s", path)
}
return c.updateCollection(path, children)
}
return nil

}
}

Expand Down Expand Up @@ -296,6 +306,102 @@ func (c *SolrMonitor) initCollectionWithPrsStrings(coll *collection, children []
return c.updateCollectionWithPrsStrings(coll, children)
}

// updateCollection (usePersistentWatch == false)
func (c *SolrMonitor) updateCollection(path string, children []string) error {
rs, err := c.updateCollectionState(path, children)

if err == nil && rs != nil && len(rs) > 0 {
coll := c.getCollFromPath(path)
if coll != nil {
c.mu.RLock()
defer c.mu.RUnlock()
if c.solrEventListener != nil {
collName := c.getCollNameFromPath(path)
c.solrEventListener.SolrCollectionReplicaStatesChanged(collName, rs)
}
}
}

return err
}

// updateCollectionState (usePersistentWatch == false)
func (c *SolrMonitor) updateCollectionState(path string, children []string) (map[string]*PerReplicaState, error) {
c.logger.Printf("updateCollectionState: path %s, children %d", path, len(children))
coll := c.getCollFromPath(path)
if coll == nil || len(children) == 0 {
//looks like we have not got the collection event yet; it should be safe to ignore it
return nil, nil
}

rmap := make(map[string]*PerReplicaState)

for _, r := range children {
replicaParts := strings.Split(r, ":")
if len(replicaParts) < 3 || len(replicaParts) > 4 {
c.logger.Printf("PRS protocol is wrong %s ", r)
panic(fmt.Sprintf("PRS protocol is in wrong format %s ", r))
}
version, err := strconv.ParseInt(replicaParts[1], 10, 32)
if err != nil {
c.logger.Printf("PRS protocol has wrong version %s ", r)
panic(fmt.Sprintf("PRS protocol has wrong version %s ", r))
}

prs := &PerReplicaState{
Name: replicaParts[0],
Version: int32(version),
}

switch replicaParts[2] {
case "A":
prs.State = "active"
case "D":
prs.State = "down"
case "R":
prs.State = "recovering"
case "F":
prs.State = "RECOVERY_FAILED"
default:
// marking inactive - as it should be recoverable error
c.logger.Printf("ERROR: PRS protocol UNKNOWN state %s ", replicaParts[2])
prs.State = "inactive"
}

prs.Leader = "false"
if len(replicaParts) == 4 {
prs.Leader = "true"
}

//keep ths latest prs
if currentPrs, found := rmap[prs.Name]; found {
if currentPrs.Version >= prs.Version {
continue
}
}

rmap[prs.Name] = prs
}
c.logger.Printf("updateCollectionState on collection %s: updating prs state %s", coll.name, rmap)
coll.mu.Lock()
defer coll.mu.Unlock()
//update the collection state based on new PRS (per replica state)
for _, shard := range coll.collectionState.Shards {
for rname, rstate := range shard.Replicas {
if prs, found := rmap[rname]; found {
if prs.Version < rstate.Version {
c.logger.Printf("WARNING: PRS update with lower version than received previously. Existing: %v, Update: %v", rstate, prs)
}
rstate.Version = prs.Version
rstate.Leader = prs.Leader
rstate.State = prs.State
}
}
}

return rmap, nil
}

// updateCollectionWithPrsPath does 2 things:
// 1.update the collection with PRS update(s) from the path
// 2.notify the solrEventListener of the PRS change, it provides the full PRS entries + the new updated one
Expand Down Expand Up @@ -400,8 +506,14 @@ func (c *SolrMonitor) shouldWatchChildren(path string) bool {
case c.solrRoot + liveQueryNodesPath:
return true
default:
//for PRS entries in /solr/collections/<collName>/state.json ... it's now using recursive watch
//for PRS entries in /solr/collections/<collName>/state.json ... it's now using permanent recursive watch
//There should not be any child watches used per collection anymore
if !c.usePersistentWatch && strings.HasPrefix(path, c.solrRoot+"/collections/") && strings.HasSuffix(path, "/state.json") {
coll := c.getCollFromPath(path)
if coll != nil {
return coll.isPRSEnabled()
}
}
return false
}
}
Expand Down Expand Up @@ -444,7 +556,12 @@ func (c *SolrMonitor) dataChanged(path string, data string, version int32) error
state := coll.setStateData(data, version)
c.callSolrListener(coll.name, state)

//No need to start monitoring replica status, as the recursive watch on state.json is already doing that
//Only need to start monitoring replica status if usePersistentWatch == false,
//As the recursive watch on state.json already watches all PRS entry changes
if !c.usePersistentWatch && coll.isPRSEnabled() {
coll.startMonitoringReplicaStatus()
}

} else if coll.isPRSEnabled() && c.isPrsPath(path) { //change on PRS entry
return fmt.Errorf("unexpected data change on PRS entry: %s", path)
} else {
Expand Down Expand Up @@ -571,16 +688,15 @@ func (c *SolrMonitor) updateCollections(collections []string) error {
// Now start any new collections.
var errCount int32
var wg sync.WaitGroup
for _, c := range added {
coll := c
for _, coll := range added {
wg.Add(1)
go func() {
go func(coll *collection) {
defer wg.Done()
if err := coll.start(); err != nil {
if err := coll.start(c.usePersistentWatch); err != nil {
coll.parent.logger.Printf("solrmonitor: error starting coll: %s: %s", coll.name, err)
atomic.AddInt32(&errCount, 1)
}
}()
}(coll)
}
wg.Wait()

Expand Down Expand Up @@ -702,15 +818,21 @@ func parseStateData(name string, data []byte, version int32) (*CollectionState,
return collState, nil
}

func (coll *collection) start() error {
func (coll *collection) start(usePersistentWatch bool) error {
collPath := coll.parent.solrRoot + "/collections/" + coll.name
statePath := collPath + "/state.json"
if err := coll.parent.zkWatcher.MonitorData(collPath); err != nil { //for init
return err
}

if err := coll.parent.zkWatcher.MonitorDataRecursive(statePath, 1); err != nil {
return err
if usePersistentWatch {
if err := coll.parent.zkWatcher.MonitorDataRecursive(statePath, 1); err != nil {
return err
}
} else {
if err := coll.parent.zkWatcher.MonitorData(statePath); err != nil {
return err
}
}

return nil
Expand Down Expand Up @@ -800,6 +922,20 @@ func (coll *collection) carryOverConfigName(newState *CollectionState) {
}
}

// startMonitoringReplicaStatus only used if usePersistentWatch == false
func (coll *collection) startMonitoringReplicaStatus() {
path := coll.parent.solrRoot + "/collections/" + coll.name + "/state.json"

// TODO: need to revisit coll.isWatched flag(if zk disconnects?). we need to create watch once only Scott?
if !coll.hasWatch() {
err := coll.parent.zkWatcher.MonitorChildren(path)
if err == nil {
coll.parent.logger.Printf("startMonitoringReplicaStatus: watching collection [%s] children for PRS", coll.name)
coll.watchAdded()
}
}
}

func (coll *collection) watchAdded() {
coll.mu.Lock()
defer coll.mu.Unlock()
Expand Down
23 changes: 13 additions & 10 deletions solrmonitor/solrmonitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func setup(t *testing.T) (*SolrMonitor, *testutil) {
logger := smtestutil.NewZkTestLogger(t)
watcher := NewZkWatcherMan(logger)
connOption := func(c *zk.Conn) { c.SetLogger(logger) }
conn, _, err := zk.Connect([]string{"127.0.0.1:2181"}, time.Second*5, connOption, zk.WithEventCallback(watcher.EventCallback))
conn, _, err := zk.Connect([]string{"127.0.0.1:2181"}, time.Hour*5, connOption, zk.WithEventCallback(watcher.EventCallback))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -205,7 +205,7 @@ func TestCollectionChanges(t *testing.T) {
// Get a fresh new solr monitor and make sure it starts in the right state.
w2 := NewZkWatcherMan(testutil.logger)
connOption := func(c *zk.Conn) { c.SetLogger(testutil.logger) }
conn2, _, err := zk.Connect([]string{"127.0.0.1:2181"}, time.Second*5, connOption, zk.WithEventCallback(w2.EventCallback))
conn2, _, err := zk.Connect([]string{"127.0.0.1:2181"}, time.Hour*5, connOption, zk.WithEventCallback(w2.EventCallback))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -251,7 +251,8 @@ func TestPRSProtocol(t *testing.T) {
currentFetchCount := testSetup.collectionStateFetchCount
shouldNotExist(t, sm, "c1")
checkCollectionStateCallback(t, 1, testSetup.solrEventListener.collStateEvents+testSetup.solrEventListener.collReplicaChangeEvents)
checkFetchCount(t, currentFetchCount, 2) //state.json fetch and 1 children fetch on PRS from coll.start()
//checkFetchCount(t, currentFetchCount, 2) //state.json fetch and 1 children fetch on PRS from coll.start()
checkFetchCount(t, currentFetchCount, 1) //state.json fetch

_, err = zkCli.Create(sm.solrRoot+"/collections/c1/state.json", nil, 0, zk.WorldACL(zk.PermAll))
if err != nil {
Expand All @@ -260,7 +261,8 @@ func TestPRSProtocol(t *testing.T) {

shouldNotExist(t, sm, "c1")
checkCollectionStateCallback(t, 2, testSetup.solrEventListener.collStateEvents+testSetup.solrEventListener.collReplicaChangeEvents)
checkFetchCount(t, currentFetchCount, 3) //state.json fetch from new state.json
//checkFetchCount(t, currentFetchCount, 3) //state.json fetch from new state.json
checkFetchCount(t, currentFetchCount, 2) //state.json fetch from new state.json

_, err = zkCli.Set(sm.solrRoot+"/collections/c1/state.json", []byte("{\"c1\":{\"perReplicaState\":\"true\", \"shards\":{\"shard_1\":{\"replicas\":{\"R1\":{\"core\":\"core1\", \"state\":\"down\"}}}}}}"), -1)
if err != nil {
Expand Down Expand Up @@ -307,7 +309,7 @@ func TestPRSProtocol(t *testing.T) {
t.Fatal(err)
}
prsShouldExist(t, sm, "c1", "shard_1", "R1", "active", "false", 2)
checkCollectionStateCallback(t, 6, testSetup.solrEventListener.collStateEvents+testSetup.solrEventListener.collReplicaChangeEvents)
checkCollectionStateCallback(t, 7, testSetup.solrEventListener.collStateEvents+testSetup.solrEventListener.collReplicaChangeEvents)

// 4. adding PRS for replica R1, version 3, state active and leader
_, err = zkCli.Create(sm.solrRoot+"/collections/c1/state.json/R1:3:A:L", nil, 0, zk.WorldACL(zk.PermAll))
Expand All @@ -319,32 +321,33 @@ func TestPRSProtocol(t *testing.T) {
t.Fatal(err)
}
prsShouldExist(t, sm, "c1", "shard_1", "R1", "active", "true", 3)
checkCollectionStateCallback(t, 7, testSetup.solrEventListener.collStateEvents+testSetup.solrEventListener.collReplicaChangeEvents)
checkCollectionStateCallback(t, 8, testSetup.solrEventListener.collStateEvents+testSetup.solrEventListener.collReplicaChangeEvents)

//5. split shard
_, err = zkCli.Set(sm.solrRoot+"/collections/c1/state.json", []byte("{\"c1\":{\"perReplicaState\":\"true\", \"shards\":{\"shard_1\":{\"replicas\":{\"R1\":{\"core\":\"core1\", \"state\":\"down\"}}}, \"shard_1_0\":{\"replicas\":{\"R1_0\":{\"core\":\"core1\", \"state\":\"down\"}}}, \"shard_1_1\":{\"replicas\":{\"R1_1\":{\"core\":\"core1\", \"state\":\"down\"}}}}}}"), -1)
if err != nil {
t.Fatal(err)
}
time.Sleep(5000 * time.Millisecond)
checkCollectionStateCallback(t, 8, testSetup.solrEventListener.collStateEvents+testSetup.solrEventListener.collReplicaChangeEvents)
checkFetchCount(t, currentFetchCount, 5) //state.json fetch from updated state.json
checkCollectionStateCallback(t, 9, testSetup.solrEventListener.collStateEvents+testSetup.solrEventListener.collReplicaChangeEvents)
//checkFetchCount(t, currentFetchCount, 5) //state.json fetch from updated state.json
checkFetchCount(t, currentFetchCount, 10) //state.json fetch from updated state.json + PRS child changes

// 6. replica R1_0 should exist
_, err = zkCli.Create(sm.solrRoot+"/collections/c1/state.json/R1_0:1:A:L", nil, 0, zk.WorldACL(zk.PermAll))
if err != nil {
t.Fatal(err)
}
prsShouldExist(t, sm, "c1", "shard_1_0", "R1_0", "active", "true", 1)
checkCollectionStateCallback(t, 9, testSetup.solrEventListener.collStateEvents+testSetup.solrEventListener.collReplicaChangeEvents)
checkCollectionStateCallback(t, 10, testSetup.solrEventListener.collStateEvents+testSetup.solrEventListener.collReplicaChangeEvents)

// 7. replica R1_1 should exist
_, err = zkCli.Create(sm.solrRoot+"/collections/c1/state.json/R1_1:1:A:L", nil, 0, zk.WorldACL(zk.PermAll))
if err != nil {
t.Fatal(err)
}
prsShouldExist(t, sm, "c1", "shard_1_1", "R1_1", "active", "true", 1)
checkCollectionStateCallback(t, 10, testSetup.solrEventListener.collStateEvents+testSetup.solrEventListener.collReplicaChangeEvents)
checkCollectionStateCallback(t, 11, testSetup.solrEventListener.collStateEvents+testSetup.solrEventListener.collReplicaChangeEvents)

if testSetup.solrEventListener.collStateEvents != 4 || testSetup.solrEventListener.collections != 1 {
t.Fatalf("Event listener didn't not get event for collection = %d, collectionstateEvents = %d", testSetup.solrEventListener.collections, testSetup.solrEventListener.collStateEvents)
Expand Down
Loading