diff --git a/server.go b/server.go index d43f583a28..bdca8ed893 100644 --- a/server.go +++ b/server.go @@ -249,6 +249,10 @@ func (s *Server) initialize(interceptorChain *rpcperms.InterceptorChain) error { if err := s.cfg.AuxInvoiceManager.Start(); err != nil { return fmt.Errorf("unable to start aux invoice mgr: %w", err) } + if err := s.cfg.AuxChanCloser.Start(); err != nil { + return fmt.Errorf("unable to start aux chan closer: %w", + err) + } if err := s.cfg.AuxSweeper.Start(); err != nil { return fmt.Errorf("unable to start aux sweeper mgr: %w", err) } @@ -864,6 +868,10 @@ func (s *Server) Stop() error { err) } + if err := s.cfg.AuxChanCloser.Stop(); err != nil { + return fmt.Errorf("unable to stop aux chan closer: %w", + err) + } if err := s.cfg.AuxLeafSigner.Stop(); err != nil { return err } diff --git a/tapchannel/aux_closer.go b/tapchannel/aux_closer.go index d41f58edc0..10a7a8911c 100644 --- a/tapchannel/aux_closer.go +++ b/tapchannel/aux_closer.go @@ -97,11 +97,18 @@ type assetCloseInfo struct { // AuxChanCloser is used to implement asset-aware co-op close for channels. type AuxChanCloser struct { + startOnce sync.Once + stopOnce sync.Once + cfg AuxChanCloserCfg sync.RWMutex closeInfo map[wire.OutPoint]*assetCloseInfo + + // ContextGuard provides a wait group and main quit channel that can + // be used to create guarded contexts. + *fn.ContextGuard } // NewAuxChanCloser creates a new instance of the auxiliary channel closer. @@ -109,9 +116,35 @@ func NewAuxChanCloser(cfg AuxChanCloserCfg) *AuxChanCloser { return &AuxChanCloser{ cfg: cfg, closeInfo: make(map[wire.OutPoint]*assetCloseInfo), + ContextGuard: &fn.ContextGuard{ + DefaultTimeout: DefaultTimeout, + Quit: make(chan struct{}), + }, } } +// Start attempts to start the auxiliary channel closer. +func (a *AuxChanCloser) Start() error { + var startErr error + a.startOnce.Do(func() { + log.Info("Starting aux chan closer") + }) + return startErr +} + +// Stop signals for the auxiliary channel closer to gracefully exit. +func (a *AuxChanCloser) Stop() error { + var stopErr error + a.stopOnce.Do(func() { + log.Info("Stopping aux chan closer") + + close(a.Quit) + a.Wg.Wait() + }) + + return stopErr +} + // createCloseAlloc is a helper function that creates an allocation for an asset // close. This does not set a script key, as the script key will be set for each // packet after the coins have been distributed. @@ -738,8 +771,14 @@ func (a *AuxChanCloser) FinalizeClose(desc types.AuxCloseDesc, ChainLookupGen: a.cfg.ChainBridge, IgnoreChecker: a.cfg.IgnoreChecker, } + ctx, cancel := a.WithCtxQuitNoTimeout() + defer cancel() + + a.Wg.Add(1) + defer a.Wg.Done() + err = importOutputProofs( - desc.ShortChanID, fundingInputProofs, + ctx, desc.ShortChanID, fundingInputProofs, a.cfg.DefaultCourierAddr, a.cfg.ProofFetcher, a.cfg.ChainBridge, vCtx, a.cfg.ProofArchive, ) diff --git a/tapchannel/aux_sweeper.go b/tapchannel/aux_sweeper.go index 5af5255b8f..2cd2906aad 100644 --- a/tapchannel/aux_sweeper.go +++ b/tapchannel/aux_sweeper.go @@ -1400,7 +1400,7 @@ func (a *AuxSweeper) importOutputScriptKeys(desc tapscriptSweepDescs) error { // importOutputProofs imports the output proofs into the pending asset funding // into our local database. This preps us to be able to detect force closes. -func importOutputProofs(scid lnwire.ShortChannelID, +func importOutputProofs(ctx context.Context, scid lnwire.ShortChannelID, outputProofs []*proof.Proof, courierAddr *url.URL, proofDispatch proof.CourierDispatch, chainBridge tapgarden.ChainBridge, vCtx proof.VerifierCtx, proofArchive proof.Archiver) error { @@ -1415,7 +1415,6 @@ func importOutputProofs(scid lnwire.ShortChannelID, // the funding outputs we need. // // TODO(roasbeef): assume single asset for now, also additional inputs - ctxb := context.Background() for _, proofToImport := range outputProofs { // Check if the proof is already imported to avoid redundant // work. @@ -1424,7 +1423,7 @@ func importOutputProofs(scid lnwire.ShortChannelID, ScriptKey: *proofToImport.Asset.ScriptKey.PubKey, OutPoint: fn.Ptr(proofToImport.OutPoint()), } - proofExists, err := proofArchive.HasProof(ctxb, fundingLocator) + proofExists, err := proofArchive.HasProof(ctx, fundingLocator) if err != nil { return fmt.Errorf("unable to check if proof "+ "exists: %w", err) @@ -1463,7 +1462,7 @@ func importOutputProofs(scid lnwire.ShortChannelID, // First, we'll make a courier to use in fetching the proofs we // need. proofFetcher, err := proofDispatch.NewCourier( - ctxb, courierAddr, true, + ctx, courierAddr, true, ) if err != nil { return fmt.Errorf("unable to create proof courier: %w", @@ -1476,7 +1475,7 @@ func importOutputProofs(scid lnwire.ShortChannelID, Amount: proofToImport.Asset.Amount, } prefixProof, err := proofFetcher.ReceiveProof( - ctxb, recipient, inputProofLocator, + ctx, recipient, inputProofLocator, ) // Always attempt to close the courier, even if we encounter an @@ -1496,7 +1495,7 @@ func importOutputProofs(scid lnwire.ShortChannelID, // the transition proof to include the proper block+merkle proof // information. err = updateProofsFromShortChanID( - ctxb, chainBridge, scid, []*proof.Proof{proofToImport}, + ctx, chainBridge, scid, []*proof.Proof{proofToImport}, ) if err != nil { return fmt.Errorf("error updating transition "+ @@ -1524,7 +1523,7 @@ func importOutputProofs(scid lnwire.ShortChannelID, } err = proofArchive.ImportProofs( - ctxb, vCtx, false, &proof.AnnotatedProof{ + ctx, vCtx, false, &proof.AnnotatedProof{ Locator: fundingLocator, Blob: finalProofBuf.Bytes(), }, @@ -1597,7 +1596,7 @@ func (a *AuxSweeper) importCommitTx(req lnwallet.ResolutionReq, IgnoreChecker: a.cfg.IgnoreChecker, } err = importOutputProofs( - req.ShortChanID, maps.Values(fundingInputProofs), + ctxb, req.ShortChanID, maps.Values(fundingInputProofs), a.cfg.DefaultCourierAddr, a.cfg.ProofFetcher, a.cfg.ChainBridge, vCtx, a.cfg.ProofArchive, )