Programming/Python 3

Python SSE webshell 샘플 (ansi2html 적용)

Lawmin 2025. 3. 7. 16:58

 

추후 참고 차, 일반화해서 간략하게 작성한 것으로,

서버의 명령어를 실행하여 ansi 출력물을 html로 스트리밍 해오는 샘플 코드입니다.

(중지 가능)

 

pip install 로 flask, ansi2html 등 필요한 라이브러리 설치하고,

app.py 등으로 저장 후, python app.py 로 실행하면 됩니다.

이후 해당 서버 IP:5000 으로 접속하면 테스트 가능합니다.

 

from flask import Flask, request, render_template_string, Response, send_from_directory
import urllib.parse
import subprocess
import ansi2html
import re

app = Flask(__name__)

process = None

HTML_TEMPLATE = '''
<!doctype html>
<html>
<head>
    <title>Web shell</title>
    <style>
        #loading_icon {
            display: none;
            width: 16px;
            height: 16px;
            border: 3px solid rgba(0, 0, 255, 0.3);
            border-top: 3px solid blue;
            border-radius: 50%;
            animation: spin 1s linear infinite;
            margin-left: 10px;
            vertical-align: middle;
        }

        @keyframes spin {
            0% { transform: rotate(0deg); }
            100% { transform: rotate(360deg); }
        }

        pre {
            background: black;
            color: white;
            padding: 10px;
            font-family: "Courier New", Courier, monospace;
            font-size: small;
            white-space: pre-wrap;
            height: calc(100vh - 100px);
            overflow-y: auto;
            position: relative;
        }
    </style>
    <script>
        function runProcess() {
            var cmdText = document.getElementById("cmd_text").value;
            var runBtn = document.getElementById("run_button");
            var stopBtn = document.getElementById("stop_button");
            var outputDiv = document.getElementById("output");
            var loadingIcon = document.getElementById("loading_icon");

            if (!cmdText) {
                alert("명령줄을 입력하세요!");
                return;
            }

            runBtn.disabled = true;
            stopBtn.disabled = false;
            loadingIcon.style.display = "inline-block";
            outputDiv.innerHTML = "";

            var eventSource = new EventSource(`/run?cmd=${encodeURIComponent(cmdText)}`);

            var styleElement = document.head.querySelector("#ansi-style");
            if (!styleElement) {
                styleElement = document.createElement("style");
                styleElement.id = "ansi-style";
                document.head.insertAdjacentElement("afterbegin", styleElement);
            } else {
                styleElement.innerHTML = "";
            }

            eventSource.onmessage = function(event) {
                outputDiv.innerHTML += event.data + "<br>";
                outputDiv.scrollTop = outputDiv.scrollHeight;
            };

            eventSource.addEventListener("style_update", function(event) {
                styleElement.innerHTML += event.data;
            });
            
            eventSource.onerror = function() {
                eventSource.close();
                loadingIcon.style.display = "none";
                runBtn.disabled = false;
                stopBtn.disabled = true;
            };
        }
        
        function stopProcess() {
            fetch('/stop')
        }
    </script>
</head>
<body>
    <input type="text" id="cmd_text" placeholder="ls -al" required>
    <button id="run_button" onclick="runProcess()">실행</button>
    <button id="stop_button" onclick="stopProcess()" disabled>중지</button>
    <span id="loading_icon"></span>
    <pre id="output" style="background: black; color: white; padding: 10px;"></pre>
</body>
</html>
'''

@app.route('/')
def index():
    return HTML_TEMPLATE

@app.route('/run')
def run():
    cmdText = request.args.get('cmd', 'pwd')

    def generate():
        stored_lines = []
        try:
            cmds = urllib.parse.unquote(cmdText).split(" ")

            global process
            process = subprocess.Popen(cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)

            ansi_converter = ansi2html.Ansi2HTMLConverter(dark_bg=True)

            for i, line in enumerate(process.stdout):
                stripped_line = line.strip() 
                stored_lines.append(stripped_line)
                html_line = ansi_converter.convert(stripped_line, full=False)
                yield f"data: {html_line}\n\n"

            process.wait()

            full_ansi_text = "\n".join(stored_lines)
            full_html = ansi_converter.convert(full_ansi_text, full=True)
            style_match = re.search(r'(<style.*?</style>)', full_html, re.DOTALL)
            style_tag = style_match.group(1) if style_match else ""
            if style_tag:
                for style_line in style_tag.split("\n"):
                    yield f"event: style_update\ndata: {style_line}\n\n"

        except Exception as e:
            yield f"data: <span style='color: red;'>{str(e)}</span>\n\n"

    return Response(generate(), mimetype='text/event-stream')

@app.route('/stop')
def stop_process():
    global process
    if process:
        process.terminate()
    
if __name__ == '__main__':
    app.run(debug=True, threaded=True, host='0.0.0.0', port='5000')