Compare commits

...

6 Commits

Author SHA1 Message Date
风扇滑翔翼
7dc1c4fc5b Fix mux 2025-12-23 11:43:44 +00:00
风扇滑翔翼
ea5a53ce6d Try fix 2025-12-23 11:18:54 +00:00
风扇滑翔翼
c6a76ff281 format 2025-12-22 05:13:52 +00:00
风扇滑翔翼
42214c3287 Use bufferSize to calc channel buffer 2025-12-22 02:47:03 +00:00
风扇滑翔翼
8c0d32f6f1 Magic 2025-12-21 18:00:13 +00:00
风扇滑翔翼
d5f17ab4fc Try to optimize pipe performance 2025-12-11 11:59:22 +00:00
3 changed files with 140 additions and 28 deletions

View File

@@ -2,10 +2,12 @@ package buf
import (
"io"
"sync"
"time"
"github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/common/signal"
"github.com/xtls/xray-core/features/policy"
"github.com/xtls/xray-core/features/stats"
)
@@ -113,7 +115,12 @@ func Copy(reader Reader, writer Writer, options ...CopyOption) error {
for _, option := range options {
option(&handler)
}
err := copyInternal(reader, writer, &handler)
var err error
if shouldUseCopyV(reader, writer) {
err = copyVInternal(reader, writer, &handler)
} else {
err = copyInternal(reader, writer, &handler)
}
if err != nil && errors.Cause(err) != io.EOF {
return err
}
@@ -133,3 +140,112 @@ func CopyOnceTimeout(reader Reader, writer Writer, timeout time.Duration) error
}
return writer.WriteMultiBuffer(mb)
}
func shouldUseCopyV(reader Reader, writer Writer) bool {
// if writer is not support writeV, directly return false
if _, ok := writer.(*BufferToBytesWriter); !ok {
return false
}
// try to figure out if the underlying reader is SingleReader
var doCopyV bool
if tr, ok := reader.(*TimeoutWrapperReader); ok {
if _, ok := tr.Reader.(*SingleReader); ok {
doCopyV = true
}
}
if _, ok := reader.(*SingleReader); ok {
doCopyV = true
}
return doCopyV
}
func copyVInternal(r Reader, w Writer, handler *copyHandler) error {
// channel buffer size is maxBuffer/maxPerPacketLen (ignore the case of many small packets)
// default buffer size:
// 0 in ARM MIPS MIPSLE
// 4kb in ARM64 MIPS64 MIPS64LE
// 512kb in others
channelBuffer := (policy.SessionDefault().Buffer.PerConnection) / Size
if channelBuffer <= 0 {
channelBuffer = 4
}
cache := make(chan *Buffer, channelBuffer)
stopRead := make(chan struct{})
var rErr error
var wErr error
wg := sync.WaitGroup{}
wg.Add(2)
// downlink
go func() {
defer wg.Done()
defer close(cache)
for {
mb, err := r.ReadMultiBuffer()
for _, b := range mb {
if err == nil {
select {
case cache <- b:
// must be write error
case <-stopRead:
b.Release()
return
}
} else {
rErr = err
select {
case cache <- b:
case <-stopRead:
b.Release()
}
return
}
}
}
}()
// uplink
go func() {
defer wg.Done()
for {
b, ok := <-cache
if !ok {
return
}
var buffers = []*Buffer{b}
for stop := false; !stop; {
select {
case b, ok := <-cache:
if !ok {
stop = true
continue
}
buffers = append(buffers, b)
default:
stop = true
}
}
mb := MultiBuffer(buffers)
err := w.WriteMultiBuffer(mb)
for _, handler := range handler.onData {
handler(mb)
}
ReleaseMulti(mb)
if err != nil {
wErr = err
close(stopRead)
return
}
}
}()
wg.Wait()
// drain cache
for b := range cache {
b.Release()
}
if wErr != nil {
return writeError{wErr}
}
if rErr != nil {
return readError{rErr}
}
return nil
}

View File

@@ -159,6 +159,11 @@ func (r *SingleReader) ReadMultiBuffer() (MultiBuffer, error) {
return MultiBuffer{b}, err
}
func (r *SingleReader) readBuffer() (*Buffer, error) {
b, err := ReadBuffer(r.Reader)
return b, err
}
// PacketReader is a Reader that read one Buffer every time.
type PacketReader struct {
io.Reader

View File

@@ -3,7 +3,6 @@ package pipe
import (
"errors"
"io"
"runtime"
"sync"
"time"
@@ -136,11 +135,10 @@ func (p *pipe) writeMultiBufferInternal(mb buf.MultiBuffer) error {
if p.data == nil {
p.data = mb
return nil
} else {
p.data, _ = buf.MergeMulti(p.data, mb)
}
p.data, _ = buf.MergeMulti(p.data, mb)
return errSlowDown
return nil
}
func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error {
@@ -155,30 +153,23 @@ func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error {
return nil
}
if err == errSlowDown {
p.readSignal.Signal()
// Yield current goroutine. Hopefully the reading counterpart can pick up the payload.
runtime.Gosched()
return nil
if err == errBufferFull {
if p.option.discardOverflow {
buf.ReleaseMulti(mb)
return nil
}
select {
case <-p.writeSignal.Wait():
continue
case <-p.done.Wait():
buf.ReleaseMulti(mb)
return io.ErrClosedPipe
}
}
if err == errBufferFull && p.option.discardOverflow {
buf.ReleaseMulti(mb)
return nil
}
if err != errBufferFull {
buf.ReleaseMulti(mb)
p.readSignal.Signal()
return err
}
select {
case <-p.writeSignal.Wait():
case <-p.done.Wait():
return io.ErrClosedPipe
}
buf.ReleaseMulti(mb)
p.readSignal.Signal()
return err
}
}