mirror of
https://github.com/XTLS/Xray-core.git
synced 2026-01-14 14:47:01 +08:00
Compare commits
13 Commits
copilot/fi
...
copilot/fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
31d10f3544 | ||
|
|
1ad1608581 | ||
|
|
c00c697b65 | ||
|
|
4e0a87faf4 | ||
|
|
2d37e84d4d | ||
|
|
47a1e042e4 | ||
|
|
cc36c1b5bf | ||
|
|
ea3badc641 | ||
|
|
41050594e5 | ||
|
|
ecef77ff48 | ||
|
|
52f7f3d174 | ||
|
|
385867e82b | ||
|
|
a99fe66467 |
@@ -160,7 +160,7 @@ func (s *ClassicNameServer) getCacheController() *CacheController {
|
||||
}
|
||||
|
||||
// sendQuery implements CachedNameserver.
|
||||
func (s *ClassicNameServer) sendQuery(ctx context.Context, noResponseErrCh chan<- error, fqdn string, option dns_feature.IPOption) {
|
||||
func (s *ClassicNameServer) sendQuery(ctx context.Context, _ chan<- error, fqdn string, option dns_feature.IPOption) {
|
||||
errors.LogInfo(ctx, s.Name(), " querying DNS for: ", fqdn)
|
||||
|
||||
reqs := buildReqMsgs(fqdn, option, s.newReqID, genEDNS0Options(s.clientIP, 0))
|
||||
@@ -171,14 +171,7 @@ func (s *ClassicNameServer) sendQuery(ctx context.Context, noResponseErrCh chan<
|
||||
ctx: ctx,
|
||||
}
|
||||
s.addPendingRequest(udpReq)
|
||||
b, err := dns.PackMessage(req.msg)
|
||||
if err != nil {
|
||||
errors.LogErrorInner(ctx, err, "failed to pack dns query")
|
||||
if noResponseErrCh != nil {
|
||||
noResponseErrCh <- err
|
||||
}
|
||||
return
|
||||
}
|
||||
b, _ := dns.PackMessage(req.msg)
|
||||
copyDest := net.UDPDestination(s.address.Address, s.address.Port)
|
||||
b.UDP = ©Dest
|
||||
s.udpServer.Dispatch(toDnsContext(ctx, s.address.String()), *s.address, b)
|
||||
|
||||
@@ -124,6 +124,26 @@ type netBindClient struct {
|
||||
ctx context.Context
|
||||
dialer internet.Dialer
|
||||
reserved []byte
|
||||
|
||||
// Track all peer connections for unified reading
|
||||
connMutex sync.RWMutex
|
||||
conns map[*netEndpoint]net.Conn
|
||||
dataChan chan *receivedData
|
||||
closeChan chan struct{}
|
||||
closeOnce sync.Once
|
||||
}
|
||||
|
||||
const (
|
||||
// Buffer size for dataChan - allows some buffering of received packets
|
||||
// while dispatcher matches them with read requests
|
||||
dataChannelBufferSize = 100
|
||||
)
|
||||
|
||||
type receivedData struct {
|
||||
data []byte
|
||||
n int
|
||||
endpoint *netEndpoint
|
||||
err error
|
||||
}
|
||||
|
||||
func (bind *netBindClient) connectTo(endpoint *netEndpoint) error {
|
||||
@@ -133,34 +153,114 @@ func (bind *netBindClient) connectTo(endpoint *netEndpoint) error {
|
||||
}
|
||||
endpoint.conn = c
|
||||
|
||||
go func(readQueue <-chan *netReadInfo, endpoint *netEndpoint) {
|
||||
// Initialize channels on first connection
|
||||
bind.connMutex.Lock()
|
||||
if bind.conns == nil {
|
||||
bind.conns = make(map[*netEndpoint]net.Conn)
|
||||
bind.dataChan = make(chan *receivedData, dataChannelBufferSize)
|
||||
bind.closeChan = make(chan struct{})
|
||||
|
||||
// Start unified reader dispatcher
|
||||
go bind.unifiedReader()
|
||||
}
|
||||
bind.conns[endpoint] = c
|
||||
bind.connMutex.Unlock()
|
||||
|
||||
// Start a reader goroutine for this specific connection
|
||||
go func(conn net.Conn, endpoint *netEndpoint) {
|
||||
const maxPacketSize = 1500
|
||||
for {
|
||||
v, ok := <-readQueue
|
||||
if !ok {
|
||||
select {
|
||||
case <-bind.closeChan:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
buf := make([]byte, maxPacketSize)
|
||||
n, err := conn.Read(buf)
|
||||
|
||||
// Send only the valid data portion to dispatcher
|
||||
dataToSend := buf
|
||||
if n > 0 && n < len(buf) {
|
||||
dataToSend = buf[:n]
|
||||
}
|
||||
|
||||
// Send received data to dispatcher
|
||||
select {
|
||||
case bind.dataChan <- &receivedData{
|
||||
data: dataToSend,
|
||||
n: n,
|
||||
endpoint: endpoint,
|
||||
err: err,
|
||||
}:
|
||||
case <-bind.closeChan:
|
||||
return
|
||||
}
|
||||
i, err := c.Read(v.buff)
|
||||
|
||||
if i > 3 {
|
||||
v.buff[1] = 0
|
||||
v.buff[2] = 0
|
||||
v.buff[3] = 0
|
||||
}
|
||||
|
||||
v.bytes = i
|
||||
v.endpoint = endpoint
|
||||
v.err = err
|
||||
v.waiter.Done()
|
||||
|
||||
if err != nil {
|
||||
bind.connMutex.Lock()
|
||||
delete(bind.conns, endpoint)
|
||||
endpoint.conn = nil
|
||||
bind.connMutex.Unlock()
|
||||
return
|
||||
}
|
||||
}
|
||||
}(bind.readQueue, endpoint)
|
||||
}(c, endpoint)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// unifiedReader dispatches received data to waiting read requests
|
||||
func (bind *netBindClient) unifiedReader() {
|
||||
for {
|
||||
select {
|
||||
case data := <-bind.dataChan:
|
||||
// Bounds check to prevent panic
|
||||
if data.n > len(data.data) {
|
||||
data.n = len(data.data)
|
||||
}
|
||||
|
||||
// Wait for a read request with timeout to prevent blocking forever
|
||||
select {
|
||||
case v := <-bind.readQueue:
|
||||
// Copy data to request buffer
|
||||
n := copy(v.buff, data.data[:data.n])
|
||||
|
||||
// Clear reserved bytes if needed
|
||||
if n > 3 {
|
||||
v.buff[1] = 0
|
||||
v.buff[2] = 0
|
||||
v.buff[3] = 0
|
||||
}
|
||||
|
||||
v.bytes = n
|
||||
v.endpoint = data.endpoint
|
||||
v.err = data.err
|
||||
v.waiter.Done()
|
||||
case <-bind.closeChan:
|
||||
return
|
||||
}
|
||||
case <-bind.closeChan:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close implements conn.Bind.Close for netBindClient
|
||||
func (bind *netBindClient) Close() error {
|
||||
// Use sync.Once to prevent double-close panic
|
||||
bind.closeOnce.Do(func() {
|
||||
bind.connMutex.Lock()
|
||||
if bind.closeChan != nil {
|
||||
close(bind.closeChan)
|
||||
}
|
||||
bind.connMutex.Unlock()
|
||||
})
|
||||
|
||||
// Call parent Close
|
||||
return bind.netBind.Close()
|
||||
}
|
||||
|
||||
func (bind *netBindClient) Send(buff [][]byte, endpoint conn.Endpoint) error {
|
||||
var err error
|
||||
|
||||
|
||||
@@ -114,6 +114,12 @@ func (h *Handler) processWireGuard(ctx context.Context, dialer internet.Dialer)
|
||||
}
|
||||
|
||||
// bind := conn.NewStdNetBind() // TODO: conn.Bind wrapper for dialer
|
||||
// Set workers to number of peers if not explicitly configured
|
||||
// This allows concurrent packet reception from multiple peers
|
||||
workers := int(h.conf.NumWorkers)
|
||||
if workers <= 0 && len(h.conf.Peers) > 0 {
|
||||
workers = len(h.conf.Peers)
|
||||
}
|
||||
h.bind = &netBindClient{
|
||||
netBind: netBind{
|
||||
dns: h.dns,
|
||||
@@ -121,9 +127,9 @@ func (h *Handler) processWireGuard(ctx context.Context, dialer internet.Dialer)
|
||||
IPv4Enable: h.hasIPv4,
|
||||
IPv6Enable: h.hasIPv6,
|
||||
},
|
||||
workers: int(h.conf.NumWorkers),
|
||||
workers: workers,
|
||||
},
|
||||
ctx: ctx,
|
||||
ctx: core.ToBackgroundDetachedContext(ctx),
|
||||
dialer: dialer,
|
||||
reserved: h.conf.Reserved,
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
goerrors "errors"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/xtls/xray-core/common"
|
||||
"github.com/xtls/xray-core/common/buf"
|
||||
@@ -27,7 +26,6 @@ var nullDestination = net.TCPDestination(net.AnyIP, 0)
|
||||
type Server struct {
|
||||
bindServer *netBindServer
|
||||
|
||||
infoMu sync.RWMutex
|
||||
info routingInfo
|
||||
policyManager policy.Manager
|
||||
}
|
||||
@@ -80,14 +78,12 @@ func (*Server) Network() []net.Network {
|
||||
|
||||
// Process implements proxy.Inbound.
|
||||
func (s *Server) Process(ctx context.Context, network net.Network, conn stat.Connection, dispatcher routing.Dispatcher) error {
|
||||
s.infoMu.Lock()
|
||||
s.info = routingInfo{
|
||||
ctx: ctx,
|
||||
dispatcher: dispatcher,
|
||||
inboundTag: session.InboundFromContext(ctx),
|
||||
contentTag: session.ContentFromContext(ctx),
|
||||
}
|
||||
s.infoMu.Unlock()
|
||||
|
||||
ep, err := s.bindServer.ParseEndpoint(conn.RemoteAddr().String())
|
||||
if err != nil {
|
||||
@@ -124,23 +120,18 @@ func (s *Server) Process(ctx context.Context, network net.Network, conn stat.Con
|
||||
}
|
||||
|
||||
func (s *Server) forwardConnection(dest net.Destination, conn net.Conn) {
|
||||
// Make a thread-safe copy of routing info
|
||||
s.infoMu.RLock()
|
||||
info := s.info
|
||||
s.infoMu.RUnlock()
|
||||
|
||||
if info.dispatcher == nil {
|
||||
errors.LogError(info.ctx, "unexpected: dispatcher == nil")
|
||||
if s.info.dispatcher == nil {
|
||||
errors.LogError(s.info.ctx, "unexpected: dispatcher == nil")
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
ctx, cancel := context.WithCancel(core.ToBackgroundDetachedContext(info.ctx))
|
||||
ctx, cancel := context.WithCancel(core.ToBackgroundDetachedContext(s.info.ctx))
|
||||
sid := session.NewID()
|
||||
ctx = c.ContextWithID(ctx, sid)
|
||||
inbound := session.Inbound{} // since promiscuousModeHandler mixed-up context, we shallow copy inbound (tag) and content (configs)
|
||||
if info.inboundTag != nil {
|
||||
inbound = *info.inboundTag
|
||||
if s.info.inboundTag != nil {
|
||||
inbound = *s.info.inboundTag
|
||||
}
|
||||
inbound.Name = "wireguard"
|
||||
inbound.CanSpliceCopy = 3
|
||||
@@ -150,8 +141,8 @@ func (s *Server) forwardConnection(dest net.Destination, conn net.Conn) {
|
||||
// Currently we have no way to link to the original source address
|
||||
inbound.Source = net.DestinationFromAddr(conn.RemoteAddr())
|
||||
ctx = session.ContextWithInbound(ctx, &inbound)
|
||||
if info.contentTag != nil {
|
||||
ctx = session.ContextWithContent(ctx, info.contentTag)
|
||||
if s.info.contentTag != nil {
|
||||
ctx = session.ContextWithContent(ctx, s.info.contentTag)
|
||||
}
|
||||
ctx = session.SubContextFromMuxInbound(ctx)
|
||||
|
||||
@@ -165,16 +156,7 @@ func (s *Server) forwardConnection(dest net.Destination, conn net.Conn) {
|
||||
Reason: "",
|
||||
})
|
||||
|
||||
// Set inbound and content tags from routing info for proper routing
|
||||
// These were commented out in PR #4030 but are needed for domain-based routing
|
||||
if info.inboundTag != nil {
|
||||
ctx = session.ContextWithInbound(ctx, info.inboundTag)
|
||||
}
|
||||
if info.contentTag != nil {
|
||||
ctx = session.ContextWithContent(ctx, info.contentTag)
|
||||
}
|
||||
|
||||
link, err := info.dispatcher.Dispatch(ctx, dest)
|
||||
link, err := s.info.dispatcher.Dispatch(ctx, dest)
|
||||
if err != nil {
|
||||
errors.LogErrorInner(ctx, err, "dispatch connection")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user