SSH honeypot, deployed in the wild, collecting and sharing data

How to build an SSH honeypot in Python and Docker - Part 1

7 Aug 2021 • 14 min read

Python and Docker network

In today's blog post I'll explain how to build a low-interaction SSH honeypot in Python and containerise it in Docker. In Part 2, we'll add functionality to collect malware samples -- ideal for malware analysis.

One of the key design goals of this honeypot is efficiency. Why? Well, if the honeypot has low resource requirements (RAM, CPU, etc), it means we can deploy it to a global infrastructure -- such as multiple virtual private servers (VPS) -- without incurring large costs.

Now, before we go on, I should mention that existing honeypot solutions are available. However, some of them can be quite resource intensive. I've curated a list of popular honeypots here: A Curated List of Awesome Honeypots.

To implement the SSH protocol in our honeypot we'll be using the Python library Paramiko (GitHub repo).

Contents

Prerequisites

Before we can start building the honeypot we need to setup a Dockerfile and requirements.txt file.

Dockerfile

Let's start with the Dockerfile. For this, we'll be using the standard Python3 image:

FROM python:3

WORKDIR /usr/src/app

# set environment variables
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1

# install dependencies
RUN pip3 install --upgrade pip
COPY ./requirements.txt .
RUN pip3 install -r requirements.txt

COPY . .

 

Requirements.txt

This bit's quite straightforward, as the only external library we'll be using is Paramiko. So, the requirements.txt file looks like this:

paramiko

With that all setup and ready to go, let's start building the actual honeypot!

Building the honeypot

Our main requirements for the honeypot are to have a convincing SSH server implementation, including emulated commands, and some way of logging the usernames, passwords, and other metadata we can gather.

Creating a logger

First, need a way to log the attacks our honeypot receives. To keep things simple, we'll use Python's logging library:

logging.basicConfig(
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    level=logging.INFO,
    filename='ssh_honeypot.log')

This code basically sets the logging library to save all logs to ssh_honeypot.log. We've set the logging format to display useful info such as the timestamp of when the event occurred, then we'll provide messages to log later in the honeypot code.

Implementing the honeypot

For the basic SSH honeypot setup we'll use paramiko's ServerInterface class, implementing the following methods:

class BasicSshHoneypot(paramiko.ServerInterface):

    def check_channel_request(self, kind, chanid):

    def get_allowed_auths(self, username):

    def check_auth_publickey(self, username, key):

    def check_auth_password(self, username, password):

    def check_channel_shell_request(self, channel):

    def check_channel_pty_request(self, channel, term, width, height, pixelwidth, pixelheight, modes):

    def check_channel_exec_request(self, channel, command):

Each method needs to return a specific response for the SSH server to work (see full details in Paramiko's Server implementation docs). This is also the ideal place to log things like authentication details. So, let's go ahead and populate these methods:


class BasicSshHoneypot(paramiko.ServerInterface):

    client_ip = None

    def __init__(self, client_ip):
        self.client_ip = client_ip
        self.event = threading.Event()

    def check_channel_request(self, kind, chanid):
        logging.info('client called check_channel_request ({}): {}'.format(
                    self.client_ip, kind))
        if kind == 'session':
            return paramiko.OPEN_SUCCEEDED

    def get_allowed_auths(self, username):
        logging.info('client called get_allowed_auths ({}) with username {}'.format(
                    self.client_ip, username))
        return "publickey,password"

    def check_auth_publickey(self, username, key):
        fingerprint = u(hexlify(key.get_fingerprint()))
        logging.info('client public key ({}): {}, username: {}, key name: {}, md5 fingerprint: {}, base64: {}, bits: {}'.format(
                    self.client_ip, username, key.get_name(), fingerprint, key.get_base64(), key.get_bits()))
        return paramiko.AUTH_PARTIALLY_SUCCESSFUL        

    def check_auth_password(self, username, password):
        # Accept all passwords as valid by default
        logging.info('new client credentials ({}): username: {}, password: {}'.format(
                    self.client_ip, username, password))
        return paramiko.AUTH_SUCCESSFUL

    def check_channel_shell_request(self, channel):
        self.event.set()
        return True

    def check_channel_pty_request(self, channel, term, width, height, pixelwidth, pixelheight, modes):
        return True

    def check_channel_exec_request(self, channel, command):
        command_text = str(command.decode("utf-8"))

        logging.info('client sent command via check_channel_exec_request ({}): {}'.format(
                    self.client_ip, username, command))
        return True

The above code gives us a basic implementation of an SSH server which will accept any credentials. It also logs every username and password, along with the client's IP address.

The method check_auth_publickey logs the client's public authentication key then returns AUTH_PARTIALLY_SUCCESSFUL (i.e. tells the client that a password is still required). The method check_auth_password logs the client's username and password, then returns AUTH_SUCCESSFUL.

Emulating commands

An important function of our SSH honeypot is to respond to commands. Now, we don't have time to implement every known Unix command under the sun. But, we can implement some common commands.

The code below implements the ls (list directory contents) and pwd (print working directory) commands. For the command ls the honeypot simply returns users.txt to the user (i.e. that's the only file in the current directory). The command pwd returns /home/root as the current directory:

def handle_cmd(cmd, chan, ip):

    response = ""
    if cmd.startswith("ls"):
        response = "users.txt"
    elif cmd.startswith("pwd"):
        response = "/home/root"

    if response != '':
        logging.info('Response from honeypot ({}): '.format(ip, response))
        response = response + "\r\n"
    chan.send(response)

Feel free to add your own emulated commands to this function.

Handling connections

Next up, we need to handle new connections for the honeypot. Some important parts here include sending an SSH banner to the client (so the honeypot looks more like a real server) and logging useful metadata about the client (such as SSH client version, cipher, compression, etc).

Once a client has successfully connected, we send Welcome to Ubuntu 18.04.4 LTS (GNU/Linux 4.15.0-128-generic x86_64) (feel free to change that to your preferred OS of choice). Then we start a new terminal session with $ -- and log any commands received from the client.

def handle_connection(client, addr):

    client_ip = addr[0]
    logging.info('New connection from: {}'.format(client_ip))

    try:
        transport = paramiko.Transport(client)
        transport.add_server_key(HOST_KEY)
        transport.local_version = SSH_BANNER # Change banner to appear more convincing
        server = BasicSshHoneypot(client_ip)
        try:
            transport.start_server(server=server)

        except paramiko.SSHException:
            print('*** SSH negotiation failed.')
            raise Exception("SSH negotiation failed")

        # wait for auth
        chan = transport.accept(10)
        if chan is None:
            print('*** No channel (from '+client_ip+').')
            raise Exception("No channel")
        
        chan.settimeout(10)

        if transport.remote_mac != '':
            logging.info('Client mac ({}): {}'.format(client_ip, transport.remote_mac))

        if transport.remote_compression != '':
            logging.info('Client compression ({}): {}'.format(client_ip, transport.remote_compression))

        if transport.remote_version != '':
            logging.info('Client SSH version ({}): {}'.format(client_ip, transport.remote_version))
            
        if transport.remote_cipher != '':
            logging.info('Client SSH cipher ({}): {}'.format(client_ip, transport.remote_cipher))

        server.event.wait(10)
        if not server.event.is_set():
            logging.info('** Client ({}): never asked for a shell'.format(client_ip))
            raise Exception("No shell request")
     
        try:
            chan.send("Welcome to Ubuntu 18.04.4 LTS (GNU/Linux 4.15.0-128-generic x86_64)\r\n\r\n")
            run = True
            while run:
                chan.send("$ ")
                command = ""
                while not command.endswith("\r"):
                    transport = chan.recv(1024)
                    print(client_ip+"- received:",transport)
                    # Echo input to psuedo-simulate a basic terminal
                    if(
                        transport != UP_KEY
                        and transport != DOWN_KEY
                        and transport != LEFT_KEY
                        and transport != RIGHT_KEY
                        and transport != BACK_KEY
                    ):
                        chan.send(transport)
                        command += transport.decode("utf-8")
                
                chan.send("\r\n")
                command = command.rstrip()
                logging.info('Command receied ({}): {}'.format(client_ip, command))

                if command == "exit":
                    settings.addLogEntry("Connection closed (via exit command): " + client_ip + "\n")
                    run = False

                else:
                    handle_cmd(command, chan, client_ip)

        except Exception as err:
            print('!!! Exception: {}: {}'.format(err.__class__, err))
            try:
                transport.close()
            except Exception:
                pass

        chan.close()

    except Exception as err:
        print('!!! Exception: {}: {}'.format(err.__class__, err))
        try:
            transport.close()
        except Exception:
            pass

You'll notice the code sends every character received back to the client. This is to emulate a normal terminal session (i.e. as the client writes a command, it appears on the terminal). But the code doesn't send back arrow keys (up, down, left, right) or the back key. This is to stop the client's cursor from moving around the terminal (as per a normal SSH terminal session).

Initiating the server

Now it's time to get the SSH honeypot server started. We'll use Python's socket library to open a port and bind it to our application.

We also use Python's threading library to create a new threat to handle each connection. This allows our SSH honeypot server to handle multiple connections simultaneously.

def start_server(port, bind):
    """Init and run the ssh server"""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind((bind, port))
    except Exception as err:
        print('*** Bind failed: {}'.format(err))
        traceback.print_exc()
        sys.exit(1)

    threads = []
    while True:
        try:
            sock.listen(100)
            print('Listening for connection ...')
            client, addr = sock.accept()
        except Exception as err:
            print('*** Listen/accept failed: {}'.format(err))
            traceback.print_exc()
        new_thread = threading.Thread(target=handle_connection, args=(client, addr))
        new_thread.start()
        threads.append(new_thread)

    for thread in threads:
        thread.join()

All that's left to do now is call the start_server method with the port and bind parameters, and we're good to go!

The main function

Before we call start_server we want to receive the port and bind parameters via arguments. Then set those arguments as the method's parameters:

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Run an SSH honeypot server')
    parser.add_argument("--port", "-p", help="The port to bind the ssh server to (default 22)", default=2222, type=int, action="store")
    parser.add_argument("--bind", "-b", help="The address to bind the ssh server to", default="", type=str, action="store")
    args = parser.parse_args()
    start_server(args.port, args.bind)

This basically means we run the following command to start our new SSH honeypot (on port 2222 at 127.0.0.1):

basic_ssh_honeypot.py -p 2222 -b 127.0.0.1

 

The finished Python file

Let's bring everything together for our complete file. This includes all the library imports and global variables (SSH banner text, arrow keys, etc).

We also need to import the server's key as server.key (we'll come to creating that key in a moment).

The complete basic_ssh_honeypot.py Python file:

#!/usr/bin/env python
import argparse
import threading
import socket
import sys
import os
import traceback
import logger
import json
import paramiko
from datetime import datetime
from binascii import hexlify
from paramiko.py3compat import b, u, decodebytes

HOST_KEY = paramiko.RSAKey(filename='server.key')
SSH_BANNER = "SSH-2.0-OpenSSH_8.2p1 Ubuntu-4ubuntu0.1"

UP_KEY = '\x1b[A'.encode()
DOWN_KEY = '\x1b[B'.encode()
RIGHT_KEY = '\x1b[C'.encode()
LEFT_KEY = '\x1b[D'.encode()
BACK_KEY = '\x7f'.encode()

logging.basicConfig(
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    level=logging.INFO,
    filename='ssh_honeypot.log')

def handle_cmd(cmd, chan, ip):

    response = ""
    if cmd.startswith("ls"):
        response = "users.txt"
    elif cmd.startswith("pwd"):
        response = "/home/root"

    if response != '':
        logging.info('Response from honeypot ({}): '.format(ip, response))
        response = response + "\r\n"
    chan.send(response)


class BasicSshHoneypot(paramiko.ServerInterface):

    client_ip = None

    def __init__(self, client_ip):
        self.client_ip = client_ip
        self.event = threading.Event()

    def check_channel_request(self, kind, chanid):
        logging.info('client called check_channel_request ({}): {}'.format(
                    self.client_ip, kind))
        if kind == 'session':
            return paramiko.OPEN_SUCCEEDED

    def get_allowed_auths(self, username):
        logging.info('client called get_allowed_auths ({}) with username {}'.format(
                    self.client_ip, username))
        return "publickey,password"

    def check_auth_publickey(self, username, key):
        fingerprint = u(hexlify(key.get_fingerprint()))
        logging.info('client public key ({}): username: {}, key name: {}, md5 fingerprint: {}, base64: {}, bits: {}'.format(
                    self.client_ip, username, key.get_name(), fingerprint, key.get_base64(), key.get_bits()))
        return paramiko.AUTH_PARTIALLY_SUCCESSFUL        

    def check_auth_password(self, username, password):
        # Accept all passwords as valid by default
        logging.info('new client credentials ({}): username: {}, password: {}'.format(
                    self.client_ip, username, password))
        return paramiko.AUTH_SUCCESSFUL

    def check_channel_shell_request(self, channel):
        self.event.set()
        return True

    def check_channel_pty_request(self, channel, term, width, height, pixelwidth, pixelheight, modes):
        return True

    def check_channel_exec_request(self, channel, command):
        command_text = str(command.decode("utf-8"))

        logging.info('client sent command via check_channel_exec_request ({}): {}'.format(
                    self.client_ip, username, command))
        return True


def handle_connection(client, addr):

    client_ip = addr[0]
    logging.info('New connection from: {}'.format(client_ip))

    try:
        transport = paramiko.Transport(client)
        transport.add_server_key(HOST_KEY)
        transport.local_version = SSH_BANNER # Change banner to appear more convincing
        server = BasicSshHoneypot(client_ip)
        try:
            transport.start_server(server=server)

        except paramiko.SSHException:
            print('*** SSH negotiation failed.')
            raise Exception("SSH negotiation failed")

        # wait for auth
        chan = transport.accept(10)
        if chan is None:
            print('*** No channel (from '+client_ip+').')
            raise Exception("No channel")
        
        chan.settimeout(10)

        if transport.remote_mac != '':
            logging.info('Client mac ({}): {}'.format(client_ip, transport.remote_mac))

        if transport.remote_compression != '':
            logging.info('Client compression ({}): {}'.format(client_ip, transport.remote_compression))

        if transport.remote_version != '':
            logging.info('Client SSH version ({}): {}'.format(client_ip, transport.remote_version))
            
        if transport.remote_cipher != '':
            logging.info('Client SSH cipher ({}): {}'.format(client_ip, transport.remote_cipher))

        server.event.wait(10)
        if not server.event.is_set():
            logging.info('** Client ({}): never asked for a shell'.format(client_ip))
            raise Exception("No shell request")
     
        try:
            chan.send("Welcome to Ubuntu 18.04.4 LTS (GNU/Linux 4.15.0-128-generic x86_64)\r\n\r\n")
            run = True
            while run:
                chan.send("$ ")
                command = ""
                while not command.endswith("\r"):
                    transport = chan.recv(1024)
                    print(client_ip+"- received:",transport)
                    # Echo input to psuedo-simulate a basic terminal
                    if(
                        transport != UP_KEY
                        and transport != DOWN_KEY
                        and transport != LEFT_KEY
                        and transport != RIGHT_KEY
                        and transport != BACK_KEY
                    ):
                        chan.send(transport)
                        command += transport.decode("utf-8")
                
                chan.send("\r\n")
                command = command.rstrip()
                logging.info('Command receied ({}): {}'.format(client_ip, command))
                detect_url(command, client_ip)

                if command == "exit":
                    settings.addLogEntry("Connection closed (via exit command): " + client_ip + "\n")
                    run = False

                else:
                    handle_cmd(command, chan, client_ip)

        except Exception as err:
            print('!!! Exception: {}: {}'.format(err.__class__, err))
            try:
                transport.close()
            except Exception:
                pass

        chan.close()

    except Exception as err:
        print('!!! Exception: {}: {}'.format(err.__class__, err))
        try:
            transport.close()
        except Exception:
            pass


def start_server(port, bind):
    """Init and run the ssh server"""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind((bind, port))
    except Exception as err:
        print('*** Bind failed: {}'.format(err))
        traceback.print_exc()
        sys.exit(1)

    threads = []
    while True:
        try:
            sock.listen(100)
            print('Listening for connection ...')
            client, addr = sock.accept()
        except Exception as err:
            print('*** Listen/accept failed: {}'.format(err))
            traceback.print_exc()
        new_thread = threading.Thread(target=handle_connection, args=(client, addr))
        new_thread.start()
        threads.append(new_thread)

    for thread in threads:
        thread.join()


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Run an SSH honeypot server')
    parser.add_argument("--port", "-p", help="The port to bind the ssh server to (default 22)", default=2222, type=int, action="store")
    parser.add_argument("--bind", "-b", help="The address to bind the ssh server to", default="", type=str, action="store")
    args = parser.parse_args()
    start_server(args.port, args.bind)

 

Running the honeypot

So, now we have our complete honeypot implementation, let's get everything setup and run the honeypot!

Port forwarding

First, security. The default port for SSH is 22. However, running an application on port 22 requires admin privileges. Never run a honeypot with admin privileges. Why? If an attacker somehow breaks out of the honeypot, and they're in an area running under admin, then we have a compromised system on our hands. Not fun. So, to be safe, setup a firewall to forward port 22 to a non-privileged port, such as 2222. That way, you can run the Dockerised honeypot as a non-admin user.

To setup a port forward from 22 to 2222 we can add the following rule to iptables:

iptables -A PREROUTING -t nat -p tcp --dport 22 -j REDIRECT --to-port 2222

 

Setup the server's key

Earlier, I mentioned that our honeypot will need to access the server.key file. To do that, we first need to create the key using the following command:

ssh-keygen -t rsa -f server.key

Then rename the public key from server.key.pub to server.pub with the following command:

mv server.key.pub server.pub

 

Let the attacks begin!

Having setup the port forward and server's SSH key, it's now time to run our new SSH honeypot! I'm going to presume you already have Docker installed (if not see Get Docker).

The directory that your honeypot is in should contain the following files:

  • basic_ssh_honeypot.py
  • Dockerfile
  • requirements.txt
  • server.key
  • server.pub

Once you've confirmed they're all in place, we need to build the docker image:

docker build -t basic_honeypot .

Once that's done, we can run the Dockerised honeypot with:

docker run -v ${PWD}:/usr/src/app -p 2222:2222 basic_honeypot

The parameter -v ${PWD}:/usr/src/app sets the current directory as the Docker container's volume. This means the file ssh_honeypot.log will be created in your honeypot's directory (instead of docker's less-accessible virtual volume).

With the honeypot running, you should see new connections appear in the ssh_honeypot.log file. Try testing it by connection to the honeypot with the usual SSH command:

ssh test@[honeypot-ip]

You should see your connection appear in the log file ssh_honeypot.log.

GitHub repo

All the code for this SSH honeypot is available on my GitHub repo at: github.com/sjbell/basic_ssh_honeypot.

Coming up in Part 2: we'll add functionality to collect malware samples that are uploaded to the honeypot.

Main image credit: A bundle of optical fibers. by Denny Muller. Logos for Python and Docker.

About the author

Simon BellSimon Bell is an award-winning Cyber Security Researcher, Software Engineer, and Web Security Specialist. Simon's research papers have been published internationally, and his findings have featured in Ars Technica, The Hacker News, PC World, among others. He founded Secure Honey, an open-source honeypot and threat intelligence project, in 2013. He has a PhD in Information Security and a BSc in Computer Science.

Follow Simon on Twitter: @SimonByte