# /user/bin/env python
__author__ = 'wenchong'
"""demo.py"""
import getpass
import paramiko
import os
import sys
import socket
import logging
from paramiko.py3compat import u
# windows系统无 termios 模块
try:
import termios
import tty
has_termios = True
except ImportError:
has_termios = False
# 成功登陆的用户名
USERNAME = None
def log_write(msg):
"""记录日志"""
fh = logging.FileHandler(filename='log', mode='a', encoding='utf-8')
fmt = logging.Formatter(fmt='%(asctime)s - {} - %(name)s - %(message)s'.format(USERNAME))
fh.setFormatter(fmt)
logger = logging.Logger("Command", level=logging.DEBUG)
logger.addHandler(fh)
logger.info(msg)
def interactive_shell(channel):
"""启动shell"""
if has_termios:
posix_shell(channel)
else:
windows_shell(channel)
def login():
"""模拟登陆堡垒机"""
while True:
username = input("Username: ")
password = getpass.getpass("Password: ")
if (username == 'wen' and password == '123') or (username == 'chong' and password == '123'):
global USERNAME
USERNAME = username
return username
else:
print("Username or Password is error. Please try again.")
def select_host(username):
"""根据登陆的用户名列出主机并选择"""
hosts = {
'wen': [
'10.211.55.5',
'192.168.165.130',
],
'chong': [
'10.211.55.6',
'10.211.55.7',
]
}
hosts_list = hosts.get(username)
for index, host in enumerate(hosts_list, 1):
print(index, host)
while True:
try:
user_input = input("Please select: ")
host = hosts_list[int(user_input) - 1]
return host
except KeyboardInterrupt as e:
exit("\n")
except Exception as e:
continue
def posix_shell(channel):
"""启用 Linux shell"""
import select
# 获取之前的 tty
fd = sys.stdin.fileno()
oldtty = termios.tcgetattr(fd)
try:
tty.setraw(fd)
tty.setcbreak(fd)
channel.settimeout(0.0)
command_list = []
tab_flag = False
while True:
r_list, w_list, e_list = select.select([channel, sys.stdin], [], [], 1)
if channel in r_list:
try:
x = u(channel.recv(1024))
if len(x) == 0:
print("\r\n*** EOF\r\n")
break
# 输入 tab 后的返回值如果不换行则记录为命令[补全命令]
if tab_flag:
if not x.startswith("\r\n"):
command_list.append(x)
tab_flag = False
sys.stdout.write(x)
sys.stdout.flush()
except socket.timeout:
pass
if sys.stdin in r_list:
x = sys.stdin.read(1)
if len(x) == 0:
break
# 用户输入 tab 键
if x == "\t":
tab_flag = True
else:
command_list.append(x)
if x == '\r':
command = ''.join(command_list)
# 发送的命令为空,即只有回车时忽略记录日志
if command != '\r':
log_write(command)
command_list.clear()
channel.sendall(x)
finally:
# 恢复之前的 tty
termios.tcsetattr(fd, termios.TCSADRAIN, oldtty)
def windows_shell(channel):
"""windows shell, 未验证"""
import threading
sys.stdout.write("Line-buffered terminal emulation. Press F6 or ^Z to send EOF.\r\n\r\n")
def writeall(sock):
while True:
data = sock.recv(256)
if not data:
sys.stdout.write('\r\n*** EOF ***\r\n\r\n')
sys.stdout.flush()
break
sys.stdout.write(data)
sys.stdout.flush()
writer = threading.Thread(target=writeall, args=(channel,))
writer.start()
try:
while True:
d = sys.stdin.read(1)
if not d:
break
channel.send(d)
except EOFError:
# user hit ^Z or F6
pass
def login_server(host):
"""通过 key 认证登陆到远程服务器"""
transport = paramiko.Transport((host, 22))
transport.start_client()
default_path = os.path.join(os.environ['HOME'], '.ssh', 'id_rsa')
private_key = paramiko.RSAKey.from_private_key_file(default_path)
transport.auth_publickey(username='root', key=private_key)
# 打开一个通道
channel = transport.open_session()
# 获取一个终端
channel.get_pty()
# 激活器
channel.invoke_shell()
return channel
def main():
username = login()
if username:
host = select_host(username)
channel = login_server(host)
interactive_shell(channel)
if __name__ == '__main__':
main()