Compare commits

...

13 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
31d10f3544 Restore unified reader architecture per @RPRX request
Reverted timeout-based solution and restored the unified reader architecture:
- Each peer connection continuously reads and queues to dataChan
- Single unifiedReader() dispatcher matches data with read requests
- No blocking - all connections monitored simultaneously
- Addresses @RPRX's request for unified reader instead of timeout

Architecture benefits:
- True concurrent reading from all peer connections
- Clean separation between reading and dispatching
- No timeout delays or retry loops
- Scalable to any number of peers

Tests pass.

Co-authored-by: RPRX <63339210+RPRX@users.noreply.github.com>
2026-01-11 09:47:37 +00:00
copilot-swe-agent[bot]
1ad1608581 Simplify multi-peer WireGuard fix with read timeout
Replaced complex unified reader with minimal changes:
- Add 50ms read deadline to prevent readers from blocking indefinitely
- Use core.ToBackgroundDetachedContext for connection independence
- Clear deadline after read completes
- Allows multiple peer readers to timeout and retry instead of blocking

This is much simpler than the unified reader architecture while still
solving the core blocking issue. When a reader times out, it returns
and another reader can try, allowing all peers to work.

Tests pass.

Co-authored-by: RPRX <63339210+RPRX@users.noreply.github.com>
2026-01-11 09:42:32 +00:00
copilot-swe-agent[bot]
c00c697b65 Address code review feedback - improve robustness
- Use named constant for dataChan buffer size
- Add bounds checking to prevent panic if n > len(data)
- Only send valid data portion (buf[:n]) to dataChan
- Use sync.Once to prevent double-close panic in Close()
- Add comment explaining data loss risk (acceptable for UDP-like behavior)

All tests pass, no security vulnerabilities found.

Co-authored-by: RPRX <63339210+RPRX@users.noreply.github.com>
2026-01-11 09:23:33 +00:00
copilot-swe-agent[bot]
4e0a87faf4 Implement unified reader architecture for multi-peer WireGuard
Replaced the competing reader goroutines with a unified reading architecture:
- Each peer connection continuously reads into a shared data channel
- A single dispatcher goroutine matches received data with read requests
- Eliminates blocking issues - all connections are monitored simultaneously
- No more race conditions between peer readers

This addresses @RPRX's suggestion to "统一 read 后再分给指定的 peer reader"
(unified read then distribute to specified peer readers).

Architecture:
- connectTo() registers connection and starts a dedicated reader per connection
- Each connection reader continuously reads and sends to dataChan
- unifiedReader() dispatcher waits for data, then matches with pending requests
- All peers can receive simultaneously without any blocking

Tests pass successfully.

Co-authored-by: RPRX <63339210+RPRX@users.noreply.github.com>
2026-01-11 09:19:10 +00:00
copilot-swe-agent[bot]
2d37e84d4d Add context isolation for peer connections
Use core.ToBackgroundDetachedContext to prevent all peer connections from
being cancelled when the original request context is cancelled. This ensures
peer connections remain independent and stable.

Tests pass, no security issues found.

Co-authored-by: RPRX <63339210+RPRX@users.noreply.github.com>
2026-01-11 01:11:57 +00:00
copilot-swe-agent[bot]
47a1e042e4 Simplified fix: Set workers to peer count for multi-peer support
The issue was that with only 1 worker (default), when a peer's reader goroutine
blocked waiting for data, it prevented other peers from receiving packets.

Simple solution: Automatically set workers to the number of peers if not
explicitly configured. This allows each peer to have its own worker thread
for concurrent packet reception.

- Reverted complex architectural changes
- Added simple logic to set workers = len(peers) when NumWorkers not set
- Much simpler and easier to understand than previous approach

Co-authored-by: RPRX <63339210+RPRX@users.noreply.github.com>
2026-01-11 01:09:04 +00:00
copilot-swe-agent[bot]
cc36c1b5bf Improve error handling in packet reader - check error before processing
Only clear reserved bytes if read was successful (err == nil). This prevents
processing invalid data when conn.Read() returns an error.

Code review feedback addressed.

Co-authored-by: RPRX <63339210+RPRX@users.noreply.github.com>
2026-01-09 11:28:43 +00:00
copilot-swe-agent[bot]
ea3badc641 Fix multi-peer WireGuard by redesigning packet dispatch architecture
The root cause was architectural: each peer connection created a goroutine
that competed for the same readQueue. When a goroutine grabbed a read request
but its connection had no data, it would block, preventing other peers from
receiving packets. This caused the "only one peer works at a time" behavior.

Solution: Redesigned the packet flow:
- Each peer connection now continuously reads from its socket and sends
  packets to a shared packetQueue
- A dispatcher goroutine matches readQueue requests (from WireGuard) with
  packets from packetQueue
- This allows all peer connections to work simultaneously without blocking

Changes:
- Added packetQueue channel and receivedPacket struct to buffer packets
- Modified Open() to start a dispatcher goroutine
- Rewrote connectTo() to continuously read and queue packets
- Each peer connection now operates independently

Tests pass. This architectural fix addresses the fundamental issue with
multi-peer WireGuard support.

Co-authored-by: RPRX <63339210+RPRX@users.noreply.github.com>
2026-01-09 11:25:38 +00:00
copilot-swe-agent[bot]
41050594e5 Fix multi-peer WireGuard by using detached context for bind
The issue was in client.go, not server.go. When WireGuard is used as an
outbound with multiple peers, all peers were sharing the same context from
the first connection. This caused all subsequent peer connections to be
associated with the first connection's session ID, leading to routing failures.

The fix uses core.ToBackgroundDetachedContext() to create an independent
context for the netBindClient, allowing each peer connection to work
independently with its own session context.

- Reverted incorrect changes to server.go
- Fixed client.go to use detached context for the bind
- Tests pass successfully

Co-authored-by: RPRX <63339210+RPRX@users.noreply.github.com>
2026-01-09 11:08:10 +00:00
copilot-swe-agent[bot]
ecef77ff48 Final review complete - all checks pass
Co-authored-by: RPRX <63339210+RPRX@users.noreply.github.com>
2026-01-09 10:41:06 +00:00
copilot-swe-agent[bot]
52f7f3d174 Optimize with double-checked locking for better concurrency
Co-authored-by: RPRX <63339210+RPRX@users.noreply.github.com>
2026-01-09 10:35:22 +00:00
copilot-swe-agent[bot]
385867e82b Fix race condition in WireGuard server with concurrent peer connections
Add mutex protection to server.go to prevent race condition when multiple
peers connect simultaneously. The shared routingInfo field was being
overwritten by concurrent Process() calls, causing connections to fail.

- Add sync.RWMutex to protect access to routing info
- Only update routing info if not already set or dispatcher changed
- Use local copy of routing info in forwardConnection to avoid races
- Existing tests pass

Co-authored-by: RPRX <63339210+RPRX@users.noreply.github.com>
2026-01-09 10:28:10 +00:00
copilot-swe-agent[bot]
a99fe66467 Initial plan 2026-01-09 10:20:41 +00:00
3 changed files with 125 additions and 19 deletions

2
go.mod
View File

@@ -25,6 +25,7 @@ require (
golang.org/x/net v0.48.0
golang.org/x/sync v0.19.0
golang.org/x/sys v0.39.0
golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2
golang.zx2c4.com/wireguard v0.0.0-20231211153847-12269c276173
google.golang.org/grpc v1.78.0
google.golang.org/protobuf v1.36.11
@@ -50,7 +51,6 @@ require (
golang.org/x/text v0.32.0 // indirect
golang.org/x/time v0.12.0 // indirect
golang.org/x/tools v0.39.0 // indirect
golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect

View File

@@ -124,6 +124,26 @@ type netBindClient struct {
ctx context.Context
dialer internet.Dialer
reserved []byte
// Track all peer connections for unified reading
connMutex sync.RWMutex
conns map[*netEndpoint]net.Conn
dataChan chan *receivedData
closeChan chan struct{}
closeOnce sync.Once
}
const (
// Buffer size for dataChan - allows some buffering of received packets
// while dispatcher matches them with read requests
dataChannelBufferSize = 100
)
type receivedData struct {
data []byte
n int
endpoint *netEndpoint
err error
}
func (bind *netBindClient) connectTo(endpoint *netEndpoint) error {
@@ -133,34 +153,114 @@ func (bind *netBindClient) connectTo(endpoint *netEndpoint) error {
}
endpoint.conn = c
go func(readQueue <-chan *netReadInfo, endpoint *netEndpoint) {
// Initialize channels on first connection
bind.connMutex.Lock()
if bind.conns == nil {
bind.conns = make(map[*netEndpoint]net.Conn)
bind.dataChan = make(chan *receivedData, dataChannelBufferSize)
bind.closeChan = make(chan struct{})
// Start unified reader dispatcher
go bind.unifiedReader()
}
bind.conns[endpoint] = c
bind.connMutex.Unlock()
// Start a reader goroutine for this specific connection
go func(conn net.Conn, endpoint *netEndpoint) {
const maxPacketSize = 1500
for {
v, ok := <-readQueue
if !ok {
select {
case <-bind.closeChan:
return
default:
}
buf := make([]byte, maxPacketSize)
n, err := conn.Read(buf)
// Send only the valid data portion to dispatcher
dataToSend := buf
if n > 0 && n < len(buf) {
dataToSend = buf[:n]
}
// Send received data to dispatcher
select {
case bind.dataChan <- &receivedData{
data: dataToSend,
n: n,
endpoint: endpoint,
err: err,
}:
case <-bind.closeChan:
return
}
i, err := c.Read(v.buff)
if i > 3 {
v.buff[1] = 0
v.buff[2] = 0
v.buff[3] = 0
}
v.bytes = i
v.endpoint = endpoint
v.err = err
v.waiter.Done()
if err != nil {
bind.connMutex.Lock()
delete(bind.conns, endpoint)
endpoint.conn = nil
bind.connMutex.Unlock()
return
}
}
}(bind.readQueue, endpoint)
}(c, endpoint)
return nil
}
// unifiedReader dispatches received data to waiting read requests
func (bind *netBindClient) unifiedReader() {
for {
select {
case data := <-bind.dataChan:
// Bounds check to prevent panic
if data.n > len(data.data) {
data.n = len(data.data)
}
// Wait for a read request with timeout to prevent blocking forever
select {
case v := <-bind.readQueue:
// Copy data to request buffer
n := copy(v.buff, data.data[:data.n])
// Clear reserved bytes if needed
if n > 3 {
v.buff[1] = 0
v.buff[2] = 0
v.buff[3] = 0
}
v.bytes = n
v.endpoint = data.endpoint
v.err = data.err
v.waiter.Done()
case <-bind.closeChan:
return
}
case <-bind.closeChan:
return
}
}
}
// Close implements conn.Bind.Close for netBindClient
func (bind *netBindClient) Close() error {
// Use sync.Once to prevent double-close panic
bind.closeOnce.Do(func() {
bind.connMutex.Lock()
if bind.closeChan != nil {
close(bind.closeChan)
}
bind.connMutex.Unlock()
})
// Call parent Close
return bind.netBind.Close()
}
func (bind *netBindClient) Send(buff [][]byte, endpoint conn.Endpoint) error {
var err error

View File

@@ -114,6 +114,12 @@ func (h *Handler) processWireGuard(ctx context.Context, dialer internet.Dialer)
}
// bind := conn.NewStdNetBind() // TODO: conn.Bind wrapper for dialer
// Set workers to number of peers if not explicitly configured
// This allows concurrent packet reception from multiple peers
workers := int(h.conf.NumWorkers)
if workers <= 0 && len(h.conf.Peers) > 0 {
workers = len(h.conf.Peers)
}
h.bind = &netBindClient{
netBind: netBind{
dns: h.dns,
@@ -121,9 +127,9 @@ func (h *Handler) processWireGuard(ctx context.Context, dialer internet.Dialer)
IPv4Enable: h.hasIPv4,
IPv6Enable: h.hasIPv6,
},
workers: int(h.conf.NumWorkers),
workers: workers,
},
ctx: ctx,
ctx: core.ToBackgroundDetachedContext(ctx),
dialer: dialer,
reserved: h.conf.Reserved,
}