diff --git a/go.mod b/go.mod index 8dcb6e37..f967a5a3 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/livekit/protocol v1.45.1 github.com/livekit/psrpc v0.7.1 github.com/livekit/server-sdk-go/v2 v2.16.1 - github.com/livekit/sipgo v0.13.2-0.20260325054441-8dee0d1c8190 + github.com/livekit/sipgo v0.13.2-0.20260407210901-862b5e0eaf3f github.com/mjibson/go-dsp v0.0.0-20180508042940-11479a337f12 github.com/ory/dockertest/v3 v3.12.0 github.com/pion/rtp v1.10.1 diff --git a/go.sum b/go.sum index af7bd89d..19a2055e 100644 --- a/go.sum +++ b/go.sum @@ -142,8 +142,8 @@ github.com/livekit/psrpc v0.7.1 h1:ms37az0QTD3UXIWuUC5D/SkmKOlRMVRsI261eBWu/Vw= github.com/livekit/psrpc v0.7.1/go.mod h1:bZ4iHFQptTkbPnB0LasvRNu/OBYXEu1NA6O5BMFo9kk= github.com/livekit/server-sdk-go/v2 v2.16.1 h1:ZkIA9OdVvQ6Up1uW/RtQ0YJUgYMJ6+ywOmDg0jX7bTg= github.com/livekit/server-sdk-go/v2 v2.16.1/go.mod h1:oQbYijcbPzfjBAOzoq7tz9Ktqur8JNRCd923VP8xOQQ= -github.com/livekit/sipgo v0.13.2-0.20260325054441-8dee0d1c8190 h1:faSHy5wbIRo9RXYtc4cV7M89GsXbbv69TIdkyVnMH2E= -github.com/livekit/sipgo v0.13.2-0.20260325054441-8dee0d1c8190/go.mod h1:aDa6mbFktNzA1D917RhFlIB5IOfNBTmrwt+/lX960j0= +github.com/livekit/sipgo v0.13.2-0.20260407210901-862b5e0eaf3f h1:08dkYJIR1GE1DNCKN7bIrgX+/nR+CttdFqcwZSYal2E= +github.com/livekit/sipgo v0.13.2-0.20260407210901-862b5e0eaf3f/go.mod h1:aDa6mbFktNzA1D917RhFlIB5IOfNBTmrwt+/lX960j0= github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o= github.com/mackerelio/go-osstat v0.2.5/go.mod h1:atxwWF+POUZcdtR1wnsUcQxTytoHG4uhl2AKKzrOajY= github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= diff --git a/pkg/config/config.go b/pkg/config/config.go index e59c9d29..e1dd76fd 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -74,21 +74,22 @@ type Config struct { ApiSecret string `yaml:"api_secret"` // required (env LIVEKIT_API_SECRET) WsUrl string `yaml:"ws_url"` // required (env LIVEKIT_WS_URL) - HealthPort int `yaml:"health_port"` - PrometheusPort int `yaml:"prometheus_port"` - PProfPort int `yaml:"pprof_port"` - SIPPort int `yaml:"sip_port"` // announced SIP signaling port - SIPPortListen int `yaml:"sip_port_listen"` // SIP signaling port to listen on - SIPHostname string `yaml:"sip_hostname"` - SIPRingingInterval time.Duration `yaml:"sip_ringing_interval"` // from 1 sec up to 60 (default '1s') - TCP *TCPConfig `yaml:"tcp"` - TLS *TLSConfig `yaml:"tls"` - RTPPort rtcconfig.PortRange `yaml:"rtp_port"` - Logging logger.Config `yaml:"logging"` - ClusterID string `yaml:"cluster_id"` // cluster this instance belongs to - MaxCpuUtilization float64 `yaml:"max_cpu_utilization"` - MaxActiveCalls int `yaml:"max_active_calls"` // if set, used for affinity-based routing - SIPTrunkIds []string `yaml:"sip_trunk_ids"` // if set, only accept calls for these trunk IDs + HealthPort int `yaml:"health_port"` + PrometheusPort int `yaml:"prometheus_port"` + PProfPort int `yaml:"pprof_port"` + SIPPort int `yaml:"sip_port"` // announced SIP signaling port + SIPPortListen int `yaml:"sip_port_listen"` // SIP signaling port to listen on + SIPHostname string `yaml:"sip_hostname"` + OutboundRouteHeaders []string `yaml:"outbound_route_headers"` // Route headers prepended to outbound requests, e.g. "" + SIPRingingInterval time.Duration `yaml:"sip_ringing_interval"` // from 1 sec up to 60 (default '1s') + TCP *TCPConfig `yaml:"tcp"` + TLS *TLSConfig `yaml:"tls"` + RTPPort rtcconfig.PortRange `yaml:"rtp_port"` + Logging logger.Config `yaml:"logging"` + ClusterID string `yaml:"cluster_id"` // cluster this instance belongs to + MaxCpuUtilization float64 `yaml:"max_cpu_utilization"` + MaxActiveCalls int `yaml:"max_active_calls"` // if set, used for affinity-based routing + SIPTrunkIds []string `yaml:"sip_trunk_ids"` // if set, only accept calls for these trunk IDs UseExternalIP bool `yaml:"use_external_ip"` LocalNet string `yaml:"local_net"` // local IP net to use, e.g. 192.168.0.0/24 diff --git a/pkg/sip/features.go b/pkg/sip/features.go index 7b3bf4d3..e27299c9 100644 --- a/pkg/sip/features.go +++ b/pkg/sip/features.go @@ -1,3 +1,6 @@ package sip -const signalLoggingFeatureFlag = "sip.signal_logging" +const ( + signalLoggingFeatureFlag = "sip.signal_logging" + outboundRouteHeadersFeatureFlag = "sip.outbound_route_headers" +) diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 2409ba32..844fa91d 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -1008,7 +1008,7 @@ func (c *inboundCall) runMediaConn(tid traceid.ID, offerData []byte, enc livekit c.mon.SDPSize(len(answerData), false) c.log().Debugw("SDP answer", "sdp", string(answerData)) - mconf.Processor = c.s.handler.GetMediaProcessor(features, featureFlags, c.call.LkCallId) + mconf.Processor = c.s.handler.GetMediaProcessor(features, featureFlags, string(c.cc.ID())) if err = c.media.SetConfig(mconf); err != nil { return nil, err } @@ -1869,6 +1869,14 @@ func (c *sipInbound) swapSrcDst(req *sip.Request) { for req.RemoveHeader("Via") { } req.PrependHeader(c.generateViaHeader(req)) + + rrHdrs := req.GetHeaders("Record-Route") + for _, hdr := range rrHdrs { + req.PrependHeader(&sip.RouteHeader{Address: hdr.(*sip.RecordRouteHeader).Address}) + } + // Remove all Record-Route headers + for req.RemoveHeader("Record-Route") { + } } func (c *sipInbound) generateViaHeader(req *sip.Request) *sip.ViaHeader { @@ -1903,7 +1911,7 @@ func (c *sipInbound) sendBye(ctx context.Context) { ctx, span := Tracer.Start(ctx, "sip.inbound.sendBye") defer span.End() // This function is for clients, so we need to swap src and dest - r := sip.NewByeRequest(c.invite, c.inviteOk, nil, false) + r := sip.NewByeRequest(c.invite, c.inviteOk, nil) for k, v := range c.fillHeaders(nil) { r.AppendHeader(sip.NewHeader(k, v)) } @@ -1911,10 +1919,7 @@ func (c *sipInbound) sendBye(ctx context.Context) { c.setCSeq(r) c.swapSrcDst(r) c.drop() - _, err := sendTxRequest(ctx, c, r) - if err != nil { - c.log.Errorw("error sending BYE", err) - } + sendAndACK(ctx, c, r) } func (c *sipInbound) sendStatus(ctx context.Context, code sip.StatusCode, status string) { @@ -1962,7 +1967,7 @@ func (c *sipInbound) newReferReq(transferTo string, headers map[string]string) ( headers = c.fillHeaders(headers) // This will effectively redirect future SIP requests to this server instance (if host address is not LB). - req := NewReferRequest(c.invite, c.inviteOk, c.contact, transferTo, headers, false) + req := NewReferRequest(c.invite, c.inviteOk, c.contact, transferTo, headers) c.setCSeq(req) c.swapSrcDst(req) diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go index e3f65a5c..d10773c5 100644 --- a/pkg/sip/outbound.go +++ b/pkg/sip/outbound.go @@ -135,6 +135,9 @@ func (c *Client) newCall(ctx context.Context, tid traceid.ID, conf *config.Confi } return AttrsToHeaders(r.LocalParticipant.Attributes(), c.sipConf.attrsToHeaders, headers) }) + if sipConf.featureFlags[outboundRouteHeadersFeatureFlag] == "true" { + call.cc.routeHeaders = conf.OutboundRouteHeaders + } call.mon = c.mon.NewCall(stats.Outbound, sipConf.host, sipConf.address) var err error @@ -769,11 +772,12 @@ func (c *Client) newOutbound(log logger.Logger, id LocalTag, from, contact URI, } type sipOutbound struct { - log logger.Logger - c *Client - id LocalTag - from *sip.FromHeader - contact *sip.ContactHeader + log logger.Logger + c *Client + id LocalTag + from *sip.FromHeader + contact *sip.ContactHeader + routeHeaders []string mu sync.RWMutex tag RemoteTag @@ -994,6 +998,15 @@ authLoop: req.Recipient.Port = 5060 } } + + // We currently don't plumb the request back to caller to construct the ACK with. + // Thus, we need to modify the request to update any route sets. + for req.RemoveHeader("Route") { + } + for _, hdr := range resp.GetHeaders("Record-Route") { + req.PrependHeader(&sip.RouteHeader{Address: hdr.(*sip.RecordRouteHeader).Address}) + } + return c.inviteOk.Body(), nil } @@ -1012,7 +1025,7 @@ func (c *sipOutbound) AckInviteOK(ctx context.Context) error { if c.invite == nil || c.inviteOk == nil { return errors.New("call already closed") } - return c.c.sipCli.WriteRequest(sip.NewAckRequest(c.invite, c.inviteOk, nil, true)) + return c.c.sipCli.WriteRequest(sip.NewAckRequest(c.invite, c.inviteOk, nil)) } func (c *sipOutbound) attemptInvite(ctx context.Context, callID sip.CallIDHeader, to *sip.ToHeader, offer []byte, authHeaderName, authHeader string, headers Headers, setState sipRespFunc) (*sip.Request, *sip.Response, error) { @@ -1038,6 +1051,10 @@ func (c *sipOutbound) attemptInvite(ctx context.Context, callID sip.CallIDHeader req.AppendHeader(h) } + for _, route := range c.routeHeaders { + req.PrependHeader(sip.NewHeader("Route", route)) + } + tx, err := c.c.sipCli.TransactionRequest(req) if err != nil { return nil, nil, err @@ -1087,7 +1104,7 @@ func (c *sipOutbound) sendBye(ctx context.Context) { } ctx, span := Tracer.Start(ctx, "sip.outbound.sendBye") defer span.End() - r := sip.NewByeRequest(c.invite, c.inviteOk, nil, true) + r := sip.NewByeRequest(c.invite, c.inviteOk, nil) r.AppendHeader(sip.NewHeader("User-Agent", "LiveKit")) if c.getHeaders != nil { for k, v := range c.getHeaders(nil) { @@ -1101,10 +1118,7 @@ func (c *sipOutbound) sendBye(ctx context.Context) { } c.setCSeq(r) c.drop() - _, err := sendTxRequest(ctx, c, r) - if err != nil { - c.log.Errorw("error sending BYE", err) - } + sendAndACK(ctx, c, r) } func (c *sipOutbound) sendCancel(ctx context.Context) { @@ -1154,7 +1168,7 @@ func (c *sipOutbound) transferCall(ctx context.Context, transferTo string, heade headers = c.getHeaders(headers) } - req := NewReferRequest(c.invite, c.inviteOk, c.contact, transferTo, headers, true) + req := NewReferRequest(c.invite, c.inviteOk, c.contact, transferTo, headers) c.setCSeq(req) cseq := req.CSeq() diff --git a/pkg/sip/protocol.go b/pkg/sip/protocol.go index 584f17d8..b4ff7762 100644 --- a/pkg/sip/protocol.go +++ b/pkg/sip/protocol.go @@ -194,16 +194,22 @@ func getContactURI(c *config.Config, ip netip.Addr, t Transport) URI { } } -func sendTxRequest(ctx context.Context, c Signaling, req *sip.Request) (*sip.Response, error) { +func sendAndACK(ctx context.Context, c Signaling, req *sip.Request) { tx, err := c.Transaction(req) if err != nil { - return nil, err + return } defer tx.Terminate() - return sipResponse(ctx, tx, nil, nil) + r, err := sipResponse(ctx, tx, nil, nil) + if err != nil { + return + } + if r.StatusCode == 200 { + _ = c.WriteRequest(sip.NewAckRequest(req, r, nil)) + } } -func NewReferRequest(inviteRequest *sip.Request, inviteResponse *sip.Response, contactHeader *sip.ContactHeader, referToUrl string, headers map[string]string, isUAC bool) *sip.Request { +func NewReferRequest(inviteRequest *sip.Request, inviteResponse *sip.Response, contactHeader *sip.ContactHeader, referToUrl string, headers map[string]string) *sip.Request { req := sip.NewRequest(sip.REFER, inviteRequest.Recipient) req.SipVersion = inviteRequest.SipVersion @@ -214,7 +220,20 @@ func NewReferRequest(inviteRequest *sip.Request, inviteResponse *sip.Response, c viaHop.Params.Add("branch", sip.GenerateBranch()) // } - sip.CopyRoutingHeaders(req, inviteRequest, inviteResponse, isUAC) + if len(inviteRequest.GetHeaders("Route")) > 0 { + sip.CopyHeaders("Route", inviteRequest, req) + } else { + hdrs := inviteResponse.GetHeaders("Record-Route") + for i := len(hdrs) - 1; i >= 0; i-- { + rrh, ok := hdrs[i].(*sip.RecordRouteHeader) + if !ok { + continue + } + + h := rrh.Clone() + req.AppendHeader(h) + } + } maxForwardsHeader := sip.MaxForwardsHeader(70) req.AppendHeader(&maxForwardsHeader) diff --git a/pkg/sip/signaling_test.go b/pkg/sip/signaling_test.go index 20144c32..733d8f8b 100644 --- a/pkg/sip/signaling_test.go +++ b/pkg/sip/signaling_test.go @@ -283,7 +283,7 @@ func (s *sipUATest) TransactionRequest(t *testing.T, req *sip.Request, isFromUAC resp := getFinalResponseOrFail(t, tx, req) if req.Method == sip.INVITE && resp.StatusCode < 300 { // Need to send ACK for 2xx INVITE, sipgo already sends ACK for 3xx+ - ack := sip.NewAckRequest(req, resp, nil, isFromUAC) + ack := sip.NewAckRequest(req, resp, nil) err = s.Client.WriteRequest(ack) require.NoError(t, err) } diff --git a/pkg/siptest/client.go b/pkg/siptest/client.go index 6648d63f..20ac6438 100644 --- a/pkg/siptest/client.go +++ b/pkg/siptest/client.go @@ -349,8 +349,12 @@ func (c *Client) Dial(ip string, host string, number string, headers map[string] } } + for _, hdr := range resp.GetHeaders("Record-Route") { + req.PrependHeader(&sip.RouteHeader{Address: hdr.(*sip.RecordRouteHeader).Address}) + } + c.mediaConn.EnableTimeout(true) - if err = c.sipClient.WriteRequest(sip.NewAckRequest(req, resp, nil, true)); err != nil { + if err = c.sipClient.WriteRequest(sip.NewAckRequest(req, resp, nil)); err != nil { return err } ip, port, err := parseSDPAnswer(resp.Body()) @@ -412,7 +416,7 @@ func (c *Client) attemptInvite(ip, host, number string, offer []byte, authHeader func (c *Client) sendBye() { c.log.Debug("sending bye") - req := sip.NewByeRequest(c.inviteReq, c.inviteResp, nil, true) + req := sip.NewByeRequest(c.inviteReq, c.inviteResp, nil) req.AppendHeader(sip.NewHeader("User-Agent", "LiveKit")) cseq := c.lastCSeq.Add(1) @@ -427,7 +431,10 @@ func (c *Client) sendBye() { select { case <-c.ack: case <-tx.Done(): - case <-tx.Responses(): + case r := <-tx.Responses(): + if r.StatusCode == 200 { + _ = c.sipClient.WriteRequest(sip.NewAckRequest(req, r, nil)) + } } }