Compare commits

..

6 Commits

Author SHA1 Message Date
风扇滑翔翼
7dc1c4fc5b Fix mux 2025-12-23 11:43:44 +00:00
风扇滑翔翼
ea5a53ce6d Try fix 2025-12-23 11:18:54 +00:00
风扇滑翔翼
c6a76ff281 format 2025-12-22 05:13:52 +00:00
风扇滑翔翼
42214c3287 Use bufferSize to calc channel buffer 2025-12-22 02:47:03 +00:00
风扇滑翔翼
8c0d32f6f1 Magic 2025-12-21 18:00:13 +00:00
风扇滑翔翼
d5f17ab4fc Try to optimize pipe performance 2025-12-11 11:59:22 +00:00
18 changed files with 186 additions and 66 deletions

View File

@@ -13,7 +13,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Restore Geodat Cache
uses: actions/cache/restore@v5
uses: actions/cache/restore@v4
with:
path: resources
key: xray-geodat-
@@ -101,7 +101,7 @@ jobs:
# go build -o build_assets/wxray.exe -trimpath -buildvcs=false -gcflags="all=-l=4" -ldflags="-H windowsgui -X github.com/xtls/xray-core/core.build=${COMMID} -s -w -buildid=" -v ./main
- name: Restore Geodat Cache
uses: actions/cache/restore@v5
uses: actions/cache/restore@v4
with:
path: resources
key: xray-geodat-
@@ -132,7 +132,7 @@ jobs:
mv build_assets Xray-${{ env.ASSET_NAME }}
- name: Upload files to Artifacts
uses: actions/upload-artifact@v6
uses: actions/upload-artifact@v5
with:
name: Xray-${{ env.ASSET_NAME }}
path: |

View File

@@ -13,7 +13,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Restore Geodat Cache
uses: actions/cache/restore@v5
uses: actions/cache/restore@v4
with:
path: resources
key: xray-geodat-
@@ -207,7 +207,7 @@ jobs:
fi
- name: Restore Geodat Cache
uses: actions/cache/restore@v5
uses: actions/cache/restore@v4
with:
path: resources
key: xray-geodat-
@@ -238,7 +238,7 @@ jobs:
mv build_assets Xray-${{ env.ASSET_NAME }}
- name: Upload files to Artifacts
uses: actions/upload-artifact@v6
uses: actions/upload-artifact@v5
with:
name: Xray-${{ env.ASSET_NAME }}
path: |

View File

@@ -25,7 +25,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Restore Geodat Cache
uses: actions/cache/restore@v5
uses: actions/cache/restore@v4
with:
path: resources
key: xray-geodat-
@@ -58,7 +58,7 @@ jobs:
done
- name: Save Geodat Cache
uses: actions/cache/save@v5
uses: actions/cache/save@v4
if: ${{ steps.update.outputs.unhit }}
with:
path: resources

View File

@@ -10,7 +10,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Restore Geodat Cache
uses: actions/cache/restore@v5
uses: actions/cache/restore@v4
with:
path: resources
key: xray-geodat-
@@ -52,7 +52,7 @@ jobs:
go-version-file: go.mod
check-latest: true
- name: Restore Geodat Cache
uses: actions/cache/restore@v5
uses: actions/cache/restore@v4
with:
path: resources
key: xray-geodat-

View File

@@ -2,10 +2,12 @@ package buf
import (
"io"
"sync"
"time"
"github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/common/signal"
"github.com/xtls/xray-core/features/policy"
"github.com/xtls/xray-core/features/stats"
)
@@ -113,7 +115,12 @@ func Copy(reader Reader, writer Writer, options ...CopyOption) error {
for _, option := range options {
option(&handler)
}
err := copyInternal(reader, writer, &handler)
var err error
if shouldUseCopyV(reader, writer) {
err = copyVInternal(reader, writer, &handler)
} else {
err = copyInternal(reader, writer, &handler)
}
if err != nil && errors.Cause(err) != io.EOF {
return err
}
@@ -133,3 +140,112 @@ func CopyOnceTimeout(reader Reader, writer Writer, timeout time.Duration) error
}
return writer.WriteMultiBuffer(mb)
}
func shouldUseCopyV(reader Reader, writer Writer) bool {
// if writer is not support writeV, directly return false
if _, ok := writer.(*BufferToBytesWriter); !ok {
return false
}
// try to figure out if the underlying reader is SingleReader
var doCopyV bool
if tr, ok := reader.(*TimeoutWrapperReader); ok {
if _, ok := tr.Reader.(*SingleReader); ok {
doCopyV = true
}
}
if _, ok := reader.(*SingleReader); ok {
doCopyV = true
}
return doCopyV
}
func copyVInternal(r Reader, w Writer, handler *copyHandler) error {
// channel buffer size is maxBuffer/maxPerPacketLen (ignore the case of many small packets)
// default buffer size:
// 0 in ARM MIPS MIPSLE
// 4kb in ARM64 MIPS64 MIPS64LE
// 512kb in others
channelBuffer := (policy.SessionDefault().Buffer.PerConnection) / Size
if channelBuffer <= 0 {
channelBuffer = 4
}
cache := make(chan *Buffer, channelBuffer)
stopRead := make(chan struct{})
var rErr error
var wErr error
wg := sync.WaitGroup{}
wg.Add(2)
// downlink
go func() {
defer wg.Done()
defer close(cache)
for {
mb, err := r.ReadMultiBuffer()
for _, b := range mb {
if err == nil {
select {
case cache <- b:
// must be write error
case <-stopRead:
b.Release()
return
}
} else {
rErr = err
select {
case cache <- b:
case <-stopRead:
b.Release()
}
return
}
}
}
}()
// uplink
go func() {
defer wg.Done()
for {
b, ok := <-cache
if !ok {
return
}
var buffers = []*Buffer{b}
for stop := false; !stop; {
select {
case b, ok := <-cache:
if !ok {
stop = true
continue
}
buffers = append(buffers, b)
default:
stop = true
}
}
mb := MultiBuffer(buffers)
err := w.WriteMultiBuffer(mb)
for _, handler := range handler.onData {
handler(mb)
}
ReleaseMulti(mb)
if err != nil {
wErr = err
close(stopRead)
return
}
}
}()
wg.Wait()
// drain cache
for b := range cache {
b.Release()
}
if wErr != nil {
return writeError{wErr}
}
if rErr != nil {
return readError{rErr}
}
return nil
}

View File

@@ -159,6 +159,11 @@ func (r *SingleReader) ReadMultiBuffer() (MultiBuffer, error) {
return MultiBuffer{b}, err
}
func (r *SingleReader) readBuffer() (*Buffer, error) {
b, err := ReadBuffer(r.Reader)
return b, err
}
// PacketReader is a Reader that read one Buffer every time.
type PacketReader struct {
io.Reader

4
go.mod
View File

@@ -8,7 +8,7 @@ require (
github.com/golang/mock v1.7.0-rc.1
github.com/google/go-cmp v0.7.0
github.com/gorilla/websocket v1.5.3
github.com/miekg/dns v1.1.69
github.com/miekg/dns v1.1.68
github.com/pelletier/go-toml v1.9.5
github.com/pires/go-proxyproto v0.8.1
github.com/quic-go/quic-go v0.57.1
@@ -27,7 +27,7 @@ require (
golang.org/x/sys v0.39.0
golang.zx2c4.com/wireguard v0.0.0-20231211153847-12269c276173
google.golang.org/grpc v1.77.0
google.golang.org/protobuf v1.36.11
google.golang.org/protobuf v1.36.10
gvisor.dev/gvisor v0.0.0-20250428193742-2d800c3129d5
h12.io/socks v1.0.3
lukechampine.com/blake3 v1.4.1

8
go.sum
View File

@@ -38,8 +38,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/miekg/dns v1.1.69 h1:Kb7Y/1Jo+SG+a2GtfoFUfDkG//csdRPwRLkCsxDG9Sc=
github.com/miekg/dns v1.1.69/go.mod h1:7OyjD9nEba5OkqQ/hB4fy3PIoxafSZJtducccIelz3g=
github.com/miekg/dns v1.1.68 h1:jsSRkNozw7G/mnmXULynzMNIsgY2dHC8LO6U6Ij2JEA=
github.com/miekg/dns v1.1.68/go.mod h1:fujopn7TB3Pu3JM69XaawiU0wqjpL9/8xGop5UrTPps=
github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8=
github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 h1:JhzVVoYvbOACxoUmOs6V/G4D5nPVUW73rKvXxP4XUJc=
@@ -144,8 +144,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8 h1:
google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
google.golang.org/grpc v1.77.0 h1:wVVY6/8cGA6vvffn+wWK5ToddbgdU3d8MNENr4evgXM=
google.golang.org/grpc v1.77.0/go.mod h1:z0BY1iVj0q8E1uSQCjL9cppRj+gnZjzDnzV0dHhrNig=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE=
google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=

View File

@@ -111,8 +111,7 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn st
destinationOverridden = true
}
}
iConn := stat.TryUnwrapStatsConn(conn)
if tlsConn, ok := iConn.(tls.Interface); ok && !destinationOverridden {
if tlsConn, ok := conn.(tls.Interface); ok && !destinationOverridden {
if serverName := tlsConn.HandshakeContextServerName(ctx); serverName != "" {
dest.Address = net.DomainAddress(serverName)
destinationOverridden = true

View File

@@ -296,7 +296,10 @@ func setUpHTTPTunnel(ctx context.Context, dest net.Destination, target string, u
return nil, err
}
iConn := stat.TryUnwrapStatsConn(rawConn)
iConn := rawConn
if statConn, ok := iConn.(*stat.CounterConnection); ok {
iConn = statConn.Connection
}
nextProto := ""
if tlsConn, ok := iConn.(*tls.Conn); ok {

View File

@@ -787,7 +787,10 @@ func readV(ctx context.Context, reader buf.Reader, writer buf.Writer, timer sign
}
func IsRAWTransportWithoutSecurity(conn stat.Connection) bool {
iConn := stat.TryUnwrapStatsConn(conn)
iConn := conn
if statConn, ok := iConn.(*stat.CounterConnection); ok {
iConn = statConn.Connection
}
_, ok1 := iConn.(*proxyproto.Conn)
_, ok2 := iConn.(*net.TCPConn)
_, ok3 := iConn.(*internet.UnixConnWrapper)

View File

@@ -147,7 +147,11 @@ func (s *Server) Network() []net.Network {
// Process implements proxy.Inbound.Process().
func (s *Server) Process(ctx context.Context, network net.Network, conn stat.Connection, dispatcher routing.Dispatcher) error {
iConn := stat.TryUnwrapStatsConn(conn)
iConn := conn
statConn, ok := iConn.(*stat.CounterConnection)
if ok {
iConn = statConn.Connection
}
sessionPolicy := s.policyManager.ForLevel(0)
if err := conn.SetReadDeadline(time.Now().Add(sessionPolicy.Timeouts.Handshake)); err != nil {

View File

@@ -265,7 +265,10 @@ func (*Handler) Network() []net.Network {
// Process implements proxy.Inbound.Process().
func (h *Handler) Process(ctx context.Context, network net.Network, connection stat.Connection, dispatcher routing.Dispatcher) error {
iConn := stat.TryUnwrapStatsConn(connection)
iConn := connection
if statConn, ok := iConn.(*stat.CounterConnection); ok {
iConn = statConn.Connection
}
if h.decryption != nil {
var err error

View File

@@ -192,7 +192,10 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte
ob.Conn = conn // for Vision's pre-connect
iConn := stat.TryUnwrapStatsConn(conn)
iConn := conn
if statConn, ok := iConn.(*stat.CounterConnection); ok {
iConn = statConn.Connection
}
target := ob.Target
errors.LogInfo(ctx, "tunneling request to ", target, " via ", rec.Destination.NetAddr())

View File

@@ -229,7 +229,10 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s
return errors.New("unable to set read deadline").Base(err).AtWarning()
}
iConn := stat.TryUnwrapStatsConn(connection)
iConn := connection
if statConn, ok := iConn.(*stat.CounterConnection); ok {
iConn = statConn.Connection
}
_, isDrain := iConn.(*net.TCPConn)
if !isDrain {
_, isDrain = iConn.(*net.UnixConn)

View File

@@ -300,14 +300,14 @@ func (h *Handler) createIPCRequest() string {
errors.LogInfo(h.bind.ctx, "createIPCRequest use dialer dest ip: ", addr)
} else {
ips, _, err := h.dns.LookupIP(addr.Domain(), dns.IPOption{
IPv4Enable: h.conf.preferIP4(),
IPv6Enable: h.conf.preferIP6(),
IPv4Enable: h.hasIPv4 && h.conf.preferIP4(),
IPv6Enable: h.hasIPv6 && h.conf.preferIP6(),
})
{ // Resolve fallback
if (len(ips) == 0 || err != nil) && h.conf.hasFallback() {
ips, _, err = h.dns.LookupIP(addr.Domain(), dns.IPOption{
IPv4Enable: h.conf.fallbackIP4(),
IPv6Enable: h.conf.fallbackIP6(),
IPv4Enable: h.hasIPv4 && h.conf.fallbackIP4(),
IPv6Enable: h.hasIPv6 && h.conf.fallbackIP6(),
})
}
}

View File

@@ -32,13 +32,3 @@ func (c *CounterConnection) Write(b []byte) (int, error) {
}
return nBytes, err
}
func TryUnwrapStatsConn(conn net.Conn) net.Conn {
if conn == nil {
return conn
}
if conn, ok := conn.(*CounterConnection); ok {
return conn.Connection
}
return conn
}

View File

@@ -3,7 +3,6 @@ package pipe
import (
"errors"
"io"
"runtime"
"sync"
"time"
@@ -136,11 +135,10 @@ func (p *pipe) writeMultiBufferInternal(mb buf.MultiBuffer) error {
if p.data == nil {
p.data = mb
return nil
} else {
p.data, _ = buf.MergeMulti(p.data, mb)
}
p.data, _ = buf.MergeMulti(p.data, mb)
return errSlowDown
return nil
}
func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error {
@@ -155,30 +153,23 @@ func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error {
return nil
}
if err == errSlowDown {
p.readSignal.Signal()
// Yield current goroutine. Hopefully the reading counterpart can pick up the payload.
runtime.Gosched()
return nil
if err == errBufferFull {
if p.option.discardOverflow {
buf.ReleaseMulti(mb)
return nil
}
select {
case <-p.writeSignal.Wait():
continue
case <-p.done.Wait():
buf.ReleaseMulti(mb)
return io.ErrClosedPipe
}
}
if err == errBufferFull && p.option.discardOverflow {
buf.ReleaseMulti(mb)
return nil
}
if err != errBufferFull {
buf.ReleaseMulti(mb)
p.readSignal.Signal()
return err
}
select {
case <-p.writeSignal.Wait():
case <-p.done.Wait():
return io.ErrClosedPipe
}
buf.ReleaseMulti(mb)
p.readSignal.Signal()
return err
}
}