ZBLOG

测试无法使用

很抱歉,可能是因为该代码示例存在一些缺陷导致无法正常工作。以下是一个修复后的版本,您可以尝试使用它进行测试:

import socket
import select

class Socks5Proxy:
    def __init__(self, username=None, password=None):
        self.username = username
        self.password = password
    
    def start(self, host='0.0.0.0', port=1080):
        # create a TCP/IP socket
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        
        # bind the socket to a specific address and port
        server_address = (host, port)
        print('starting up on %s port %s' % server_address)
        self.sock.bind(server_address)

        # listen for incoming connections
        self.sock.listen(1)
        
        while True:
            # wait for a connection
            print('waiting for a connection...')
            client_sock, client_address = self.sock.accept()
            print('new connection from %s:%d' % client_address)

            try:
                # read the initial handshake request from the client
                data = client_sock.recv(1024)
                
                if data[0] != 5: # only support socks version 5 
                    raise Exception('unsupported socks version')
                
                nmethods = data[1]
                methods = list(data[2:nmethods+2])
                
                if not 0 in methods: # only support no authentication and username/password authentication 
                    raise Exception('unsupported authentication method')
                    
                response = bytes([5, 0]) # send back no authentication required
                
                if nmethods > len(methods):
                    methods.remove(0x02) # remove unaccepted method
                    
                    if self.username is None or self.password is None:
                        response = bytes([5, 0xFF]) # send back no acceptable methods
                    else:
                        methods.append(0x02) # add username/password method
                        response = bytes([5, 0x02]) # send back username/password authentication required
                
                client_sock.sendall(response)
                
                if response[1] == 0xFF:
                    raise Exception('no acceptable authentication method')
                
                if response[1] == 0x02: # perform username/password authentication
                    data = client_sock.recv(1024)
                    
                    ulen = data[1]
                    uname = data[2:2+ulen].decode()
                    
                    plen = data[2+ulen]
                    passwd = data[3+ulen:].decode()
                    
                    if uname != self.username or passwd != self.password:
                        raise Exception('authentication failed')
                        
                    client_sock.sendall(bytes([1, 0])) # send back success

                # read the target address and port from the client
                data = client_sock.recv(1024)

                if len(data) < 10 or data[0] != 5 or data[1] != 1:
                    raise Exception('unsupported command')

                addr_type = data[3]

                if addr_type == 1: # IPv4 address
                    host_address = socket.inet_ntoa(data[4:8])
                    port_number = int.from_bytes(data[8:], byteorder='big')
                    
                elif addr_type == 3: # domain name
                    host_len = data[4]
                    host_address = str(data[5:5+host_len], 'utf-8')
                    port_number = int.from_bytes(data[5+host_len:], byteorder='big')
                    
                elif addr_type == 4: # IPv6 address
                     host_address = socket.inet_ntop(socket.AF_INET6, data[4:20])
                     port_number = int.from_bytes(data[20:], byteorder='big')
                     
                else:
                    raise Exception('unsupported address type')

                print('target address:', host_address)
                print('target port:', port_number)

                # connect to the target server
                target_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                target_sock.settimeout(10)
                target_sock.connect((host_address, port_number))

            except Exception as e:
                print(str(e))
                client_sock.close()
            
            else:
                # relay data between the client and the target server
                inputs = [client_sock, target_sock]
                
                while True:
                    readable, _, _ = select.select(inputs, [], [])
                    
                    if client_sock in readable:
                        data = client_sock.recv(4096)
                        
                        if not data:
                            break
                        
                        target_sock.sendall(data)

                    if target_sock in readable:
                        data = target_sock.recv(4096)
                        
                        if not data:
                            break
                        
                        client_sock.sendall(data)

            finally:
                 # clean up the connection
                 print('closing connection...')
                 client_sock.close()
                 target_sock.close()

if __name__ == '__main__':
    proxy = Socks5Proxy(username='your_username', password='your_password')
    proxy.start()

在客户端中,您可以使用以下代码测试该代理服务器:

import requests

proxies = {
    'http': 'socks5://localhost:1080',
    'https': 'socks5://localhost:1080'
}

response = requests.get("https://www.google.com/", proxies=proxies)
print(response.status_code) # should return 200 OK

response2 = requests.get("https://api.ipify.org/?format=json", proxies=proxies)
print(response2.json()['ip']) # should print your proxy IP address

请注意,该代码示例仅用于演示 SOCKS5 代理服务器的实现原理,并且没有考虑到一些安全和性能问题。在生产环境中使用时,请谨慎评估其安全性和性能表现。

本站部分文章来源于网络,版权归原作者所有,如有侵权请联系站长删除。
转载请注明出处:https://sdn.0voice.com/?id=1233

分享:
扫描分享到社交APP
上一篇
下一篇
发表列表
游客 游客
此处应有掌声~
评论列表

还没有评论,快来说点什么吧~

联系我们

在线咨询: 点击这里给我发消息

微信号:3007537140

上班时间: 10:30-22:30

关注我们
x

注册

已经有帐号?