Compare commits

..

5 Commits

Author SHA1 Message Date
风扇滑翔翼
8351ccb625 Add NewClient() 2025-09-10 05:12:40 +00:00
风扇滑翔翼
f42a518bf6 Refine time usage 2025-09-10 04:56:41 +00:00
风扇滑翔翼
033d2ba2b9 Remove init 2025-09-06 19:21:29 +00:00
风扇滑翔翼
30e3fa690e Add use remote time for vmess 2025-09-06 18:23:12 +00:00
RPRX
6ec0291d4e app/reverse/bridge.go: Fix DispatchLink() returns immediately
Fixes https://github.com/XTLS/Xray-core/issues/5088
2025-09-05 15:58:49 +00:00
5 changed files with 126 additions and 20 deletions

View File

@@ -149,25 +149,23 @@ func (w *BridgeWorker) Connections() uint32 {
}
func (w *BridgeWorker) handleInternalConn(link *transport.Link) {
go func() {
reader := link.Reader
for {
mb, err := reader.ReadMultiBuffer()
if err != nil {
reader := link.Reader
for {
mb, err := reader.ReadMultiBuffer()
if err != nil {
break
}
for _, b := range mb {
var ctl Control
if err := proto.Unmarshal(b.Bytes(), &ctl); err != nil {
errors.LogInfoInner(context.Background(), err, "failed to parse proto message")
break
}
for _, b := range mb {
var ctl Control
if err := proto.Unmarshal(b.Bytes(), &ctl); err != nil {
errors.LogInfoInner(context.Background(), err, "failed to parse proto message")
break
}
if ctl.State != w.state {
w.state = ctl.State
}
if ctl.State != w.state {
w.state = ctl.State
}
}
}()
}
}
func (w *BridgeWorker) Dispatch(ctx context.Context, dest net.Destination) (*transport.Link, error) {
@@ -182,7 +180,7 @@ func (w *BridgeWorker) Dispatch(ctx context.Context, dest net.Destination) (*tra
uplinkReader, uplinkWriter := pipe.New(opt...)
downlinkReader, downlinkWriter := pipe.New(opt...)
w.handleInternalConn(&transport.Link{
go w.handleInternalConn(&transport.Link{
Reader: downlinkReader,
Writer: uplinkWriter,
})

View File

@@ -0,0 +1,34 @@
package http
import (
"context"
gohttp "net/http"
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/transport/internet"
)
// NewClient creates an HTTP client with with internal dialer and using the given sockopt.
// sockopt can only have one or empty.
func NewClient(sockopt ...*internet.SocketConfig) *gohttp.Client {
var Sockopt *internet.SocketConfig
switch len(sockopt) {
case 0:
case 1:
Sockopt = sockopt[0]
default:
panic("sockopt can only be nil or have one")
}
httpClient := &gohttp.Client{
Transport: &gohttp.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
dest, err := net.ParseDestination(network + ":" + addr)
if err != nil {
return nil, err
}
return internet.DialSystem(ctx, dest, Sockopt)
},
},
}
return httpClient
}

View File

@@ -10,10 +10,10 @@ import (
"hash/crc32"
"io"
"math"
"time"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/antireplay"
"github.com/xtls/xray-core/proxy/vmess/vtime"
)
var (
@@ -105,7 +105,7 @@ func (a *AuthIDDecoderHolder) Match(authID [16]byte) (interface{}, error) {
continue
}
if math.Abs(math.Abs(float64(t))-float64(time.Now().Unix())) > 120 {
if math.Abs(math.Abs(float64(t))-float64(vtime.Now().Unix())) > 120 {
continue
}

View File

@@ -5,14 +5,14 @@ import (
"crypto/rand"
"encoding/binary"
"io"
"time"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/crypto"
"github.com/xtls/xray-core/proxy/vmess/vtime"
)
func SealVMessAEADHeader(key [16]byte, data []byte) []byte {
generatedAuthID := CreateAuthID(key[:], time.Now().Unix())
generatedAuthID := CreateAuthID(key[:], vtime.Now().Unix())
connectionNonce := make([]byte, 8)
if _, err := io.ReadFull(rand.Reader, connectionNonce); err != nil {

View File

@@ -0,0 +1,74 @@
package vtime
import (
"context"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/common/platform"
"github.com/xtls/xray-core/common/protocol/http"
)
var timeOffset atomic.Pointer[time.Duration]
var initOnce sync.Once
func updateTimeMonitor(ctx context.Context, domain string) {
for {
select {
case <-ctx.Done():
return
case <-time.After(10 * time.Minute):
err := updateTime(domain)
if err != nil {
errors.LogError(ctx, err)
}
}
}
}
func updateTime(domain string) error {
httpClient := http.NewClient()
resp, err := httpClient.Get(domain)
if err != nil {
return errors.New("Failed to access monitor domain").Base(err)
}
timeHeader := resp.Header.Get("Date")
remoteTime, err := time.Parse(time.RFC1123, timeHeader)
if err != nil {
return errors.New("Failed to parse time from monitor domain").Base(err)
}
localTime := time.Now()
offset := remoteTime.Sub(localTime)
if offset < 2*time.Second && offset > -2*time.Second {
errors.LogWarning(context.Background(), "Time offset too small, ignoring:", offset)
return nil
}
timeOffset.Store(&offset)
return nil
}
func Now() time.Time {
initOnce.Do(func() {
timeOffset.Store(new(time.Duration))
go func() {
domain := platform.NewEnvFlag("xray.vmess.time.domain").GetValue(func() string { return "https://apple.com" })
if domain == "" {
errors.LogError(context.Background(), "vmess time domain is empty, skip time sync")
return
}
err := updateTime(domain)
if err != nil {
errors.LogError(context.Background(), err)
}
errors.LogWarning(context.Background(), "Initial time offset for vmess:", timeOffset.Load())
// only one sync should be enough, so disable periodic update for now
//go updateTimeMonitor(context.TODO(), domain)
}()
runtime.Gosched()
})
offset := timeOffset.Load()
return time.Now().Add(*offset)
}