我有一种使用Python将AF-UNIX套接字转发到AF-INET套接字的方法:
def _ip_port_forward_by_using_a_socket_file(self, socket_file_path: str, to_ip_port: str):
"""
socket_file_path: string
a socket file like /tmp/message.socket
to_ip_port: string
It is something like this: 127.0.0.1:22
It is normally used to read wire socket from usb line, like '/dev/usb0'.
But you can also use it twice to forward data between two isolated container where file sharing is possible.
"""
import socket
import threading
import os
if (os.path.exists(socket_file_path)):
os.remove(socket_file_path)
def handle(buffer: Any, direction: Any, src_address: Any, src_port: Any, dst_address: Any, dst_port: Any):
'''
intercept the data flows between local port and the target port
'''
return buffer
def transfer(src: Any, dst: Any, direction: Any):
error = None
try:
src_address, src_port = src.getsockname()
dst_address, dst_port = dst.getsockname()
except Exception as e:
error = True
print(e)
while True:
try:
buffer = src.recv(4096)
if len(buffer) > 0:
if error == True:
dst.send(buffer)
else:
dst.send(handle(buffer, direction, src_address, src_port, dst_address, dst_port))
except Exception as e:
print("error: ", repr(e))
break
src.close()
dst.close()
def server(socket_file_path: str, remote_host: Any, remote_port: Any):
server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(socket_file_path)
server_socket.listen(0x40)
while True:
src_socket, src_address = server_socket.accept()
try:
dst_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
dst_socket.connect((remote_host, remote_port))
s = threading.Thread(target=transfer, args=(dst_socket, src_socket, False))
r = threading.Thread(target=transfer, args=(src_socket, dst_socket, True))
s.start()
r.start()
except Exception as e:
print("error: ", repr(e))
to_ = to_ip_port.split(":")
server(socket_file_path=socket_file_path, remote_host=to_[0], remote_port=int(to_[1]))