How to build an SSH honeypot in Python and Docker - Part 1
In today's blog post I'll explain how to build a low-interaction SSH honeypot in Python and containerise it in Docker. In Part 2, we'll add functionality to collect malware samples -- ideal for malware analysis.
One of the key design goals of this honeypot is efficiency. Why? Well, if the honeypot has low resource requirements (RAM, CPU, etc), it means we can deploy it to a global infrastructure -- such as multiple virtual private servers (VPS) -- without incurring large costs.
Now, before we go on, I should mention that existing honeypot solutions are available. However, some of them can be quite resource intensive. I've curated a list of popular honeypots here: A Curated List of Awesome Honeypots.
To implement the SSH protocol in our honeypot we'll be using the Python library Paramiko (GitHub repo).
Contents
Prerequisites
Before we can start building the honeypot we need to setup a Dockerfile
and requirements.txt
file.
Dockerfile
Let's start with the Dockerfile
. For this, we'll be using the standard Python3 image:
FROM python:3
WORKDIR /usr/src/app
# set environment variables
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
# install dependencies
RUN pip3 install --upgrade pip
COPY ./requirements.txt .
RUN pip3 install -r requirements.txt
COPY . .
Requirements.txt
This bit's quite straightforward, as the only external library we'll be using is Paramiko
. So, the requirements.txt
file looks like this:
paramiko
With that all setup and ready to go, let's start building the actual honeypot!
Building the honeypot
Our main requirements for the honeypot are to have a convincing SSH server implementation, including emulated commands, and some way of logging the usernames, passwords, and other metadata we can gather.
Creating a logger
First, need a way to log the attacks our honeypot receives. To keep things simple, we'll use Python's logging
library:
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO,
filename='ssh_honeypot.log')
This code basically sets the logging library to save all logs to ssh_honeypot.log
. We've set the logging format to display useful info such as the timestamp of when the event occurred, then we'll provide messages to log later in the honeypot code.
Implementing the honeypot
For the basic SSH honeypot setup we'll use paramiko's ServerInterface
class, implementing the following methods:
class BasicSshHoneypot(paramiko.ServerInterface):
def check_channel_request(self, kind, chanid):
def get_allowed_auths(self, username):
def check_auth_publickey(self, username, key):
def check_auth_password(self, username, password):
def check_channel_shell_request(self, channel):
def check_channel_pty_request(self, channel, term, width, height, pixelwidth, pixelheight, modes):
def check_channel_exec_request(self, channel, command):
Each method needs to return a specific response for the SSH server to work (see full details in Paramiko's Server implementation docs). This is also the ideal place to log things like authentication details. So, let's go ahead and populate these methods:
class BasicSshHoneypot(paramiko.ServerInterface):
client_ip = None
def __init__(self, client_ip):
self.client_ip = client_ip
self.event = threading.Event()
def check_channel_request(self, kind, chanid):
logging.info('client called check_channel_request ({}): {}'.format(
self.client_ip, kind))
if kind == 'session':
return paramiko.OPEN_SUCCEEDED
def get_allowed_auths(self, username):
logging.info('client called get_allowed_auths ({}) with username {}'.format(
self.client_ip, username))
return "publickey,password"
def check_auth_publickey(self, username, key):
fingerprint = u(hexlify(key.get_fingerprint()))
logging.info('client public key ({}): {}, username: {}, key name: {}, md5 fingerprint: {}, base64: {}, bits: {}'.format(
self.client_ip, username, key.get_name(), fingerprint, key.get_base64(), key.get_bits()))
return paramiko.AUTH_PARTIALLY_SUCCESSFUL
def check_auth_password(self, username, password):
# Accept all passwords as valid by default
logging.info('new client credentials ({}): username: {}, password: {}'.format(
self.client_ip, username, password))
return paramiko.AUTH_SUCCESSFUL
def check_channel_shell_request(self, channel):
self.event.set()
return True
def check_channel_pty_request(self, channel, term, width, height, pixelwidth, pixelheight, modes):
return True
def check_channel_exec_request(self, channel, command):
command_text = str(command.decode("utf-8"))
logging.info('client sent command via check_channel_exec_request ({}): {}'.format(
self.client_ip, username, command))
return True
The above code gives us a basic implementation of an SSH server which will accept any credentials. It also logs every username and password, along with the client's IP address.
The method check_auth_publickey
logs the client's public authentication key then returns AUTH_PARTIALLY_SUCCESSFUL
(i.e. tells the client that a password is still required). The method check_auth_password
logs the client's username and password, then returns AUTH_SUCCESSFUL
.
Emulating commands
An important function of our SSH honeypot is to respond to commands. Now, we don't have time to implement every known Unix command under the sun. But, we can implement some common commands.
The code below implements the ls
(list directory contents) and pwd
(print working directory) commands. For the command ls
the honeypot simply returns users.txt
to the user (i.e. that's the only file in the current directory). The command pwd
returns /home/root
as the current directory:
def handle_cmd(cmd, chan, ip):
response = ""
if cmd.startswith("ls"):
response = "users.txt"
elif cmd.startswith("pwd"):
response = "/home/root"
if response != '':
logging.info('Response from honeypot ({}): '.format(ip, response))
response = response + "\r\n"
chan.send(response)
Feel free to add your own emulated commands to this function.
Handling connections
Next up, we need to handle new connections for the honeypot. Some important parts here include sending an SSH banner to the client (so the honeypot looks more like a real server) and logging useful metadata about the client (such as SSH client version, cipher, compression, etc).
Once a client has successfully connected, we send Welcome to Ubuntu 18.04.4 LTS (GNU/Linux 4.15.0-128-generic x86_64)
(feel free to change that to your preferred OS of choice). Then we start a new terminal session with $
-- and log any commands received from the client.
def handle_connection(client, addr):
client_ip = addr[0]
logging.info('New connection from: {}'.format(client_ip))
try:
transport = paramiko.Transport(client)
transport.add_server_key(HOST_KEY)
transport.local_version = SSH_BANNER # Change banner to appear more convincing
server = BasicSshHoneypot(client_ip)
try:
transport.start_server(server=server)
except paramiko.SSHException:
print('*** SSH negotiation failed.')
raise Exception("SSH negotiation failed")
# wait for auth
chan = transport.accept(10)
if chan is None:
print('*** No channel (from '+client_ip+').')
raise Exception("No channel")
chan.settimeout(10)
if transport.remote_mac != '':
logging.info('Client mac ({}): {}'.format(client_ip, transport.remote_mac))
if transport.remote_compression != '':
logging.info('Client compression ({}): {}'.format(client_ip, transport.remote_compression))
if transport.remote_version != '':
logging.info('Client SSH version ({}): {}'.format(client_ip, transport.remote_version))
if transport.remote_cipher != '':
logging.info('Client SSH cipher ({}): {}'.format(client_ip, transport.remote_cipher))
server.event.wait(10)
if not server.event.is_set():
logging.info('** Client ({}): never asked for a shell'.format(client_ip))
raise Exception("No shell request")
try:
chan.send("Welcome to Ubuntu 18.04.4 LTS (GNU/Linux 4.15.0-128-generic x86_64)\r\n\r\n")
run = True
while run:
chan.send("$ ")
command = ""
while not command.endswith("\r"):
transport = chan.recv(1024)
print(client_ip+"- received:",transport)
# Echo input to psuedo-simulate a basic terminal
if(
transport != UP_KEY
and transport != DOWN_KEY
and transport != LEFT_KEY
and transport != RIGHT_KEY
and transport != BACK_KEY
):
chan.send(transport)
command += transport.decode("utf-8")
chan.send("\r\n")
command = command.rstrip()
logging.info('Command receied ({}): {}'.format(client_ip, command))
if command == "exit":
settings.addLogEntry("Connection closed (via exit command): " + client_ip + "\n")
run = False
else:
handle_cmd(command, chan, client_ip)
except Exception as err:
print('!!! Exception: {}: {}'.format(err.__class__, err))
try:
transport.close()
except Exception:
pass
chan.close()
except Exception as err:
print('!!! Exception: {}: {}'.format(err.__class__, err))
try:
transport.close()
except Exception:
pass
You'll notice the code sends every character received back to the client. This is to emulate a normal terminal session (i.e. as the client writes a command, it appears on the terminal). But the code doesn't send back arrow keys (up, down, left, right) or the back key. This is to stop the client's cursor from moving around the terminal (as per a normal SSH terminal session).
Initiating the server
Now it's time to get the SSH honeypot server started. We'll use Python's socket
library to open a port and bind it to our application.
We also use Python's threading
library to create a new threat to handle each connection. This allows our SSH honeypot server to handle multiple connections simultaneously.
def start_server(port, bind):
"""Init and run the ssh server"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((bind, port))
except Exception as err:
print('*** Bind failed: {}'.format(err))
traceback.print_exc()
sys.exit(1)
threads = []
while True:
try:
sock.listen(100)
print('Listening for connection ...')
client, addr = sock.accept()
except Exception as err:
print('*** Listen/accept failed: {}'.format(err))
traceback.print_exc()
new_thread = threading.Thread(target=handle_connection, args=(client, addr))
new_thread.start()
threads.append(new_thread)
for thread in threads:
thread.join()
All that's left to do now is call the start_server
method with the port
and bind
parameters, and we're good to go!
The main function
Before we call start_server
we want to receive the port
and bind
parameters via arguments. Then set those arguments as the method's parameters:
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Run an SSH honeypot server')
parser.add_argument("--port", "-p", help="The port to bind the ssh server to (default 22)", default=2222, type=int, action="store")
parser.add_argument("--bind", "-b", help="The address to bind the ssh server to", default="", type=str, action="store")
args = parser.parse_args()
start_server(args.port, args.bind)
This basically means we run the following command to start our new SSH honeypot (on port 2222 at 127.0.0.1):
basic_ssh_honeypot.py -p 2222 -b 127.0.0.1
The finished Python file
Let's bring everything together for our complete file. This includes all the library imports and global variables (SSH banner text, arrow keys, etc).
We also need to import the server's key as server.key
(we'll come to creating that key in a moment).
The complete basic_ssh_honeypot.py
Python file:
#!/usr/bin/env python
import argparse
import threading
import socket
import sys
import os
import traceback
import logger
import json
import paramiko
from datetime import datetime
from binascii import hexlify
from paramiko.py3compat import b, u, decodebytes
HOST_KEY = paramiko.RSAKey(filename='server.key')
SSH_BANNER = "SSH-2.0-OpenSSH_8.2p1 Ubuntu-4ubuntu0.1"
UP_KEY = '\x1b[A'.encode()
DOWN_KEY = '\x1b[B'.encode()
RIGHT_KEY = '\x1b[C'.encode()
LEFT_KEY = '\x1b[D'.encode()
BACK_KEY = '\x7f'.encode()
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO,
filename='ssh_honeypot.log')
def handle_cmd(cmd, chan, ip):
response = ""
if cmd.startswith("ls"):
response = "users.txt"
elif cmd.startswith("pwd"):
response = "/home/root"
if response != '':
logging.info('Response from honeypot ({}): '.format(ip, response))
response = response + "\r\n"
chan.send(response)
class BasicSshHoneypot(paramiko.ServerInterface):
client_ip = None
def __init__(self, client_ip):
self.client_ip = client_ip
self.event = threading.Event()
def check_channel_request(self, kind, chanid):
logging.info('client called check_channel_request ({}): {}'.format(
self.client_ip, kind))
if kind == 'session':
return paramiko.OPEN_SUCCEEDED
def get_allowed_auths(self, username):
logging.info('client called get_allowed_auths ({}) with username {}'.format(
self.client_ip, username))
return "publickey,password"
def check_auth_publickey(self, username, key):
fingerprint = u(hexlify(key.get_fingerprint()))
logging.info('client public key ({}): username: {}, key name: {}, md5 fingerprint: {}, base64: {}, bits: {}'.format(
self.client_ip, username, key.get_name(), fingerprint, key.get_base64(), key.get_bits()))
return paramiko.AUTH_PARTIALLY_SUCCESSFUL
def check_auth_password(self, username, password):
# Accept all passwords as valid by default
logging.info('new client credentials ({}): username: {}, password: {}'.format(
self.client_ip, username, password))
return paramiko.AUTH_SUCCESSFUL
def check_channel_shell_request(self, channel):
self.event.set()
return True
def check_channel_pty_request(self, channel, term, width, height, pixelwidth, pixelheight, modes):
return True
def check_channel_exec_request(self, channel, command):
command_text = str(command.decode("utf-8"))
logging.info('client sent command via check_channel_exec_request ({}): {}'.format(
self.client_ip, username, command))
return True
def handle_connection(client, addr):
client_ip = addr[0]
logging.info('New connection from: {}'.format(client_ip))
try:
transport = paramiko.Transport(client)
transport.add_server_key(HOST_KEY)
transport.local_version = SSH_BANNER # Change banner to appear more convincing
server = BasicSshHoneypot(client_ip)
try:
transport.start_server(server=server)
except paramiko.SSHException:
print('*** SSH negotiation failed.')
raise Exception("SSH negotiation failed")
# wait for auth
chan = transport.accept(10)
if chan is None:
print('*** No channel (from '+client_ip+').')
raise Exception("No channel")
chan.settimeout(10)
if transport.remote_mac != '':
logging.info('Client mac ({}): {}'.format(client_ip, transport.remote_mac))
if transport.remote_compression != '':
logging.info('Client compression ({}): {}'.format(client_ip, transport.remote_compression))
if transport.remote_version != '':
logging.info('Client SSH version ({}): {}'.format(client_ip, transport.remote_version))
if transport.remote_cipher != '':
logging.info('Client SSH cipher ({}): {}'.format(client_ip, transport.remote_cipher))
server.event.wait(10)
if not server.event.is_set():
logging.info('** Client ({}): never asked for a shell'.format(client_ip))
raise Exception("No shell request")
try:
chan.send("Welcome to Ubuntu 18.04.4 LTS (GNU/Linux 4.15.0-128-generic x86_64)\r\n\r\n")
run = True
while run:
chan.send("$ ")
command = ""
while not command.endswith("\r"):
transport = chan.recv(1024)
print(client_ip+"- received:",transport)
# Echo input to psuedo-simulate a basic terminal
if(
transport != UP_KEY
and transport != DOWN_KEY
and transport != LEFT_KEY
and transport != RIGHT_KEY
and transport != BACK_KEY
):
chan.send(transport)
command += transport.decode("utf-8")
chan.send("\r\n")
command = command.rstrip()
logging.info('Command receied ({}): {}'.format(client_ip, command))
detect_url(command, client_ip)
if command == "exit":
settings.addLogEntry("Connection closed (via exit command): " + client_ip + "\n")
run = False
else:
handle_cmd(command, chan, client_ip)
except Exception as err:
print('!!! Exception: {}: {}'.format(err.__class__, err))
try:
transport.close()
except Exception:
pass
chan.close()
except Exception as err:
print('!!! Exception: {}: {}'.format(err.__class__, err))
try:
transport.close()
except Exception:
pass
def start_server(port, bind):
"""Init and run the ssh server"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((bind, port))
except Exception as err:
print('*** Bind failed: {}'.format(err))
traceback.print_exc()
sys.exit(1)
threads = []
while True:
try:
sock.listen(100)
print('Listening for connection ...')
client, addr = sock.accept()
except Exception as err:
print('*** Listen/accept failed: {}'.format(err))
traceback.print_exc()
new_thread = threading.Thread(target=handle_connection, args=(client, addr))
new_thread.start()
threads.append(new_thread)
for thread in threads:
thread.join()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Run an SSH honeypot server')
parser.add_argument("--port", "-p", help="The port to bind the ssh server to (default 22)", default=2222, type=int, action="store")
parser.add_argument("--bind", "-b", help="The address to bind the ssh server to", default="", type=str, action="store")
args = parser.parse_args()
start_server(args.port, args.bind)
Running the honeypot
So, now we have our complete honeypot implementation, let's get everything setup and run the honeypot!
Port forwarding
First, security. The default port for SSH is 22. However, running an application on port 22 requires admin privileges. Never run a honeypot with admin privileges. Why? If an attacker somehow breaks out of the honeypot, and they're in an area running under admin, then we have a compromised system on our hands. Not fun. So, to be safe, setup a firewall to forward port 22 to a non-privileged port, such as 2222. That way, you can run the Dockerised honeypot as a non-admin user.
To setup a port forward from 22 to 2222 we can add the following rule to iptables
:
iptables -A PREROUTING -t nat -p tcp --dport 22 -j REDIRECT --to-port 2222
Setup the server's key
Earlier, I mentioned that our honeypot will need to access the server.key
file. To do that, we first need to create the key using the following command:
ssh-keygen -t rsa -f server.key
Then rename the public key from server.key.pub
to server.pub
with the following command:
mv server.key.pub server.pub
Let the attacks begin!
Having setup the port forward and server's SSH key, it's now time to run our new SSH honeypot! I'm going to presume you already have Docker installed (if not see Get Docker).
The directory that your honeypot is in should contain the following files:
- basic_ssh_honeypot.py
- Dockerfile
- requirements.txt
- server.key
- server.pub
Once you've confirmed they're all in place, we need to build the docker image:
docker build -t basic_honeypot .
Once that's done, we can run the Dockerised honeypot with:
docker run -v ${PWD}:/usr/src/app -p 2222:2222 basic_honeypot
The parameter -v ${PWD}:/usr/src/app
sets the current directory as the Docker container's volume. This means the file ssh_honeypot.log
will be created in your honeypot's directory (instead of docker's less-accessible virtual volume).
With the honeypot running, you should see new connections appear in the ssh_honeypot.log
file. Try testing it by connection to the honeypot with the usual SSH command:
ssh test@[honeypot-ip]
You should see your connection appear in the log file ssh_honeypot.log
.
GitHub repo
All the code for this SSH honeypot is available on my GitHub repo at: github.com/sjbell/basic_ssh_honeypot.
Coming up in Part 2: we'll add functionality to collect malware samples that are uploaded to the honeypot.
Main image credit: A bundle of optical fibers. by Denny Muller. Logos for Python and Docker.
About the author
Simon Bell is an award-winning Cyber Security Researcher, Software Engineer, and Web Security Specialist. Simon's research papers have been published internationally, and his findings have featured in Ars Technica, The Hacker News, PC World, among others. He founded Secure Honey, an open-source honeypot and threat intelligence project, in 2013. He has a PhD in Cyber Security from Royal Holloway's world-leading Information Security Group.
Follow Simon on Twitter: @SimonByte