很抱歉,可能是因为上一个代码示例存在一些问题导致无法使用。以下是另一个简单的 SOCKS5 代理服务器实现示例,您可以尝试使用它进行测试:
import socket
class Socks5Proxy:
def __init__(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
def start(self, host='0.0.0.0', port=1080):
# bind the socket to a specific address and port
server_address = (host, port)
print('starting up on %s port %s' % server_address)
self.sock.bind(server_address)
# listen for incoming connections
self.sock.listen(1)
while True:
# wait for a connection
print('waiting for a connection...')
client_sock, client_address = self.sock.accept()
print('new connection from %s:%d' % client_address)
try:
# read the initial handshake request from the client
data = client_sock.recv(1024)
if data[0] != 5: # only support socks version 5
raise Exception('unsupported socks version')
nmethods = data[1]
methods = list(data[2:nmethods+2])
if not 0 in methods: # only support no authentication
raise Exception('unsupported authentication method')
response = bytes([5, 0]) # send back no authentication required
client_sock.sendall(response)
# read the target address and port from the client
data = client_sock.recv(1024)
if len(data) < 10 or data[0] != 5 or data[1] != 1:
raise Exception('unsupported command')
addr_type = data[3]
if addr_type == 1: # IPv4 address
host_address = socket.inet_ntoa(data[4:8])
port_number = int.from_bytes(data[8:], byteorder='big')
elif addr_type == 3: # domain name
host_len = data[4]
host_address = str(data[5:5+host_len], 'utf-8')
port_number = int.from_bytes(data[5+host_len:], byteorder='big')
elif addr_type == 4: # IPv6 address
host_address = socket.inet_ntop(socket.AF_INET6, data[4:20])
port_number = int.from_bytes(data[20:], byteorder='big')
else:
raise Exception('unsupported address type')
print('target address:', host_address)
print('target port:', port_number)
# connect to the target server
target_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
target_sock.settimeout(10)
target_sock.connect((host_address, port_number))
except Exception as e:
print(str(e))
else:
# relay data between the client and the target server
while True:
data = client_sock.recv(4096)
if not data:
break
target_sock.sendall(data)
data = target_sock.recv(4096)
if not data:
break
client_sock.sendall(data)
finally:
# clean up the connection
print('closing connection...')
client_sock.close()
target_sock.close()
if __name__ == '__main__':
proxy = Socks5Proxy()
proxy.start()
在客户端中,您可以使用以下代码测试该代理服务器:
import requests
proxies = {
'http': 'socks5://localhost:1080',
'https': 'socks5://localhost:1080'
}
response = requests.get("https://www.google.com/", proxies=proxies)
print(response.status_code) # should return 200 OK
response2 = requests.get("https://api.ipify.org/?format=json", proxies=proxies)
print(response2.json()['ip']) # should print your proxy IP address
请注意,该代码示例仅用于演示 SOCKS5 代理服务器的实现原理,并且没有考虑到一些安全和性能问题。在生产环境中使用时,请谨慎评估其安全性和性能表现。