NoSQL บน Object Storage

วันนี้ได้ลองทำการทดสอบเล็กๆ มีคำถามอยู่ว่าถ้ามีสินค้า 1ล้านรายการหรือมากกว่า การเรียกใช้ได้เร็ว การค้ณหาข้อมูลต้องน้อยกว่าวินาที ที่นึกออกคงต้องทำ NoSQL วันนี้ก็เลยลองทำการทดสอบ เกี่ยวกับการทำ NoSQL และการนำไปใช้งาน ซึ่งสามารถนำมาอธิบายได้ดังนี้

2024-07-10 02:05:08 - @ratanon


1. การสร้างฐานข้อมูล NoSQL บน Object Storage:

  - ใช้ Vultr Object Storage เป็นที่เก็บข้อมูล

  - แปลงข้อมูลจากไฟล์ CSV เป็น JSON

  - แบ่งข้อมูลเป็นไฟล์ย่อยเพื่อการจัดการที่มีประสิทธิภาพ


2. การสร้างดัชนี (Indexing):

  - สร้างดัชนีแบบ Inverted Index สำหรับการค้นหาคำสำคัญ

  - สร้าง Prefix Trie สำหรับการค้นหาแบบ autocomplete

  - สร้างดัชนีตามหมวดหมู่สินค้า


3. การประมวลผลข้อมูล:

  - ใช้ multi-threading เพื่อเพิ่มประสิทธิภาพในการประมวลผลไฟล์ขนาดใหญ่

  - ทำการ tokenize ข้อความภาษาไทยและอังกฤษ

  - ทำความสะอาดข้อมูลและจัดการกับอักขระพิเศษ


4. การจัดเก็บข้อมูลบน Object Storage:

  - แยกเก็บข้อมูลสินค้าและดัชนีในโครงสร้างโฟลเดอร์ที่เหมาะสม

  - ใช้การเข้ารหัสและการตั้งชื่อไฟล์ที่ปลอดภัย


5. การสร้าง API สำหรับการเข้าถึงข้อมูล:

  - พัฒนา Flask API สำหรับการวิเคราะห์เนื้อหาและค้นหาสินค้าที่เกี่ยวข้อง

  - ใช้ word tokenization เพื่อแยกคำสำคัญจากข้อความ input

  - ค้นหาสินค้าที่เกี่ยวข้องโดยใช้ดัชนีที่สร้างไว้


6. การสร้างลิงก์ Affiliate:

  - พัฒนาระบบสร้างลิงก์ Affiliate แบบไดนามิก

  - รองรับการสร้างลิงก์ mouseover ที่มีการเข้ารหัสข้อมูล trace

  - จัดการกับพารามิเตอร์การติดตามต่างๆ เช่น utm_source, utm_medium


7. การแสดงผลและ UI:

  - สร้าง HTML output ที่แสดงสินค้าพร้อมลิงก์ Affiliate

  - ใช้ CSS เพื่อจัดรูปแบบการแสดงผลให้น่าสนใจ

  - เพิ่ม JavaScript เพื่อจัดการเหตุการณ์ mouseover บนลิงก์สินค้า


8. การปรับแต่งประสิทธิภาพ:

  - ใช้ caching เพื่อลดการเรียกข้อมูลซ้ำ

  - แบ่งการประมวลผลเป็นชุดเพื่อจัดการกับข้อมูลขนาดใหญ่

  - ใช้การทำงานแบบ asynchronous เพื่อเพิ่มประสิทธิภาพ


9. ความปลอดภัยและการจัดการข้อมูล:

  - ใช้การเข้ารหัสสำหรับข้อมูลที่ละเอียดอ่อน

  - จัดการสิทธิ์การเข้าถึงไฟล์บน Object Storage


10. การนำไปใช้งาน:

  - สามารถนำไปใช้ในระบบแนะนำสินค้า (Recommendation System)

  - ใช้ในการสร้างระบบ Content Marketing ที่เชื่อมโยงกับสินค้า

  - ประยุกต์ใช้ในระบบ Affiliate Marketing

การนำเข้าข้อมูลเพื่อทำ ดัชนี (Indexing)

จัดไปเบาๆ แยก CSV จากสินค้า 1 ล้านรายการ ให้เป็นไฟล์ย่อยก่อน

csv-path.py

import csv
import os


def split_csv(input_file, output_folder, rows_per_file=100000):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    with open(input_file, 'r', newline='', encoding='utf-8') as csvfile:
        reader = csv.reader(csvfile)
        header = next(reader)  # อ่านส่วนหัวของไฟล์

        file_number = 1
        row_count = 0
        current_writer = None
        current_file = None

        for row in reader:
            if row_count % rows_per_file == 0:
                if current_file:
                    current_file.close()

                output_file = os.path.join(output_folder, f'products_{file_number:03d}.csv')
                current_file = open(output_file, 'w', newline='', encoding='utf-8')
                current_writer = csv.writer(current_file)
                current_writer.writerow(header)  # เขียนส่วนหัวในทุกไฟล์
                file_number += 1

            current_writer.writerow(row)
            row_count += 1

        if current_file:
            current_file.close()

    print(f"แบ่งไฟล์เสร็จสิ้น ได้ทั้งหมด {file_number - 1} ไฟล์")


# ใช้งานฟังก์ชัน
input_file = 'data.csv'  # ชื่อไฟล์ CSV ขนาดใหญ่ของคุณ
output_folder = 'split_products'  # โฟลเดอร์ที่จะเก็บไฟล์ย่อย
rows_per_file = 1000  # จำนวนแถวต่อไฟล์ย่อย

split_csv(input_file, output_folder, rows_per_file)



แยกแล้วทำการประมวลผลเพื่อนำไปลง Object Storage และทำ ดัชนี (Indexing) จากที่แยกไฟล์ออกได้ 100ไฟล์

shopee-import-nosql.py

import csv
import json
import boto3
from botocore.config import Config
from pythainlp import word_tokenize
import nltk
from nltk.tokenize import word_tokenize as en_word_tokenize
import re
import urllib.parse
from concurrent.futures import ThreadPoolExecutor, as_completed
import os

nltk.download('punkt', quiet=True)

# Vultr API credentials and settings
ACCESS_KEY_ID = ''
SECRET_ACCESS_KEY = ''
BUCKET_NAME = 'nosql'
ENDPOINT_URL = 'https://sgp1.vultrobjects.com'

# Initialize the S3 resource with Vultr credentials
s3 = boto3.resource('s3',
                    endpoint_url=ENDPOINT_URL,
                    aws_access_key_id=ACCESS_KEY_ID,
                    aws_secret_access_key=SECRET_ACCESS_KEY,
                    config=Config(signature_version='s3v4'),
                    region_name='sgp1')

# Global dictionaries for caching
category_cache = {}
inverted_index_cache = {}
prefix_trie_cache = {}

def is_thai(text):
    return any("\u0E00" <= c <= "\u0E7F" for c in text)

def tokenize_text(text):
    if is_thai(text):
        return [word for word in word_tokenize(text, engine='newmm') if len(word) <= 20]
    else:
        return [word for word in en_word_tokenize(text) if len(word) <= 20]

def clean_filename(filename):
    filename = urllib.parse.unquote(filename)
    filename = re.sub(r'[^\w\-ก-๙\._ ]', '_', filename)  # อนุญาตอักษรภาษาไทย
    filename = re.sub(r'_+', '_', filename)
    return filename.strip('_')

def update_index(index_type, key1, key2=None, key3=None, itemid=None):
    if index_type == 'categories':
        if key1 not in category_cache:
            category_cache[key1] = {}
        if key2 not in category_cache[key1]:
            category_cache[key1][key2] = {}
        if key3 not in category_cache[key1][key2]:
            category_cache[key1][key2][key3] = set()
        category_cache[key1][key2][key3].add(itemid)
    elif index_type == 'inverted_index':
        if key1 not in inverted_index_cache:
            inverted_index_cache[key1] = set()
        inverted_index_cache[key1].add(itemid)
    elif index_type == 'prefix_trie':
        if key1 not in prefix_trie_cache:
            prefix_trie_cache[key1] = set()
        prefix_trie_cache[key1].add(itemid)

def process_csv_chunk(chunk):
    for row in chunk:
        try:
            item_filename = f'shopee/items/item_{row["itemid"]}.json'
            s3.Object(BUCKET_NAME, item_filename).put(
                Body=json.dumps(row, ensure_ascii=False),
                ACL='public-read',
                ContentType='application/json'
            )

            update_index('categories', row['global_category1'], row['global_category2'], row['global_category3'], row['itemid'])

            words = tokenize_text(row['title'])
            for word in words[:3]:
                clean_word = clean_filename(word)
                update_index('prefix_trie', clean_word, itemid=row['itemid'])
                update_index('inverted_index', clean_word, itemid=row['itemid'])
        except Exception as e:
            print(f"Error processing row {row.get('itemid', 'unknown')}: {str(e)}")

def process_csv_file(csv_file_path):
    chunk_size = 1000
    with open(csv_file_path, 'r', encoding='utf-8') as csvfile:
        csvreader = csv.DictReader(csvfile)
        chunks = []
        current_chunk = []
        for i, row in enumerate(csvreader):
            current_chunk.append(row)
            if len(current_chunk) == chunk_size:
                chunks.append(current_chunk)
                current_chunk = []
            if i % 1000 == 0:
                print(f"Read {i} rows from {csv_file_path}")
        if current_chunk:
            chunks.append(current_chunk)

    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = [executor.submit(process_csv_chunk, chunk) for chunk in chunks]
        for future in as_completed(futures):
            try:
                future.result()
            except Exception as e:
                print(f"Error in thread: {str(e)}")

    # Upload indexes after processing each file
    upload_cached_indexes()
    print(f"Uploaded indexes for {csv_file_path}")

def upload_cached_indexes():
    for key, value in category_cache.items():
        filename = f'shopee/indexes/categories_{clean_filename(key)}.json'
        s3.Object(BUCKET_NAME, filename).put(
            Body=json.dumps({k: {kk: list(vv) for kk, vv in v.items()} for k, v in value.items()}, ensure_ascii=False),
            ACL='public-read',
            ContentType='application/json'
        )

    for key, value in inverted_index_cache.items():
        filename = f'shopee/indexes/inverted_index_{clean_filename(key)}.json'
        s3.Object(BUCKET_NAME, filename).put(
            Body=json.dumps(list(value), ensure_ascii=False),
            ACL='public-read',
            ContentType='application/json'
        )

    for key, value in prefix_trie_cache.items():
        filename = f'shopee/indexes/prefix_trie_{clean_filename(key)}.json'
        s3.Object(BUCKET_NAME, filename).put(
            Body=json.dumps(list(value), ensure_ascii=False),
            ACL='public-read',
            ContentType='application/json'
        )

def ensure_folders_exist():
    folders = ['shopee/indexes', 'shopee/items']
    for folder in folders:
        s3.Object(BUCKET_NAME, f"{folder}/").put(Body='')


if __name__ == "__main__":
    try:
        ensure_folders_exist()

        # สร้างรายการไฟล์ CSV ตั้งแต่ 001 ถึง 1000
        csv_files = [f'products_{i:03d}.csv' for i in range(10, 1001)]

        for csv_file in csv_files:
            csv_file_path = os.path.join('split_products', csv_file)
            if os.path.exists(csv_file_path):
                print(f"Processing {csv_file}")
                process_csv_file(csv_file_path)
            else:
                print(f"File {csv_file} not found. Skipping.")

        print("Final upload of cached indexes")
        upload_cached_indexes()
        print("Process completed successfully.")
    except Exception as e:
        print(f"An error occurred: {str(e)}")


Code API เพื่อนำข้อมูลจาก NoSQL มาใช้งาน

shopee.py

from flask import Blueprint, request, jsonify
from pythainlp import word_tokenize
import boto3
from botocore.config import Config
import json
import re
import random
import urllib.parse
import base64
import uuid

shopee = Blueprint('shopee', __name__)

# Vultr API credentials and settings
ACCESS_KEY_ID = ''
SECRET_ACCESS_KEY = ''
BUCKET_NAME = 'nosql'
ENDPOINT_URL = 'https://sgp1.vultrobjects.com'

# Initialize the S3 client with Vultr credentials
s3 = boto3.client('s3',
                  endpoint_url=ENDPOINT_URL,
                  aws_access_key_id=ACCESS_KEY_ID,
                  aws_secret_access_key=SECRET_ACCESS_KEY,
                  config=Config(signature_version='s3v4'),
                  region_name='sgp1')


def clean_filename(filename):
    filename = re.sub(r'[^\w\-ก-๙\._ ]', '_', filename)
    filename = re.sub(r'_+', '_', filename)
    return filename.strip('_')


def get_index_data(index_type, key):
    try:
        filename = f'shopee/indexes/{index_type}_{clean_filename(key)}.json'
        response = s3.get_object(Bucket=BUCKET_NAME, Key=filename)
        return json.loads(response['Body'].read().decode('utf-8'))
    except Exception as e:
        print(f"Error getting index data: {str(e)}")
        return {}


def get_item_data(itemid):
    try:
        filename = f'shopee/items/item_{itemid}.json'
        response = s3.get_object(Bucket=BUCKET_NAME, Key=filename)
        return json.loads(response['Body'].read().decode('utf-8'))
    except Exception as e:
        print(f"Error getting item data: {str(e)}")
        return None


def create_affiliate_link(product_link, affiliate_id):
    parsed_url = urllib.parse.urlparse(product_link)
    query_params = urllib.parse.parse_qs(parsed_url.query)

    # Generate dynamic tracking IDs
    uls_trackid = f"507{uuid.uuid4().hex[:8]}0109"
    utm_term = f"bf1{uuid.uuid4().hex[:10]}"

    # Create trace data
    trace_data = {
        "trace_id": f"0.{uuid.uuid4().hex[:12]}.104",
        "pre_trace_id": f"0.{uuid.uuid4().hex[:12]}.600",
        "list_type": 104,
        "root_trace_id": f"0.{uuid.uuid4().hex[:12]}.600",
        "root_list_type": 600,
        "depth": 1
    }
    encoded_trace = base64.urlsafe_b64encode(json.dumps(trace_data).encode()).decode()

    # Extract product ID from the URL
    product_id = parsed_url.path.split('/')[-1]

    # Create affiliate link
    affiliate_params = {
        'uls_trackid': uls_trackid,
        'utm_campaign': '-',
        'utm_content': '----',
        'utm_medium': 'affiliates',
        'utm_source': f'an_{affiliate_id}',
        'utm_term': utm_term
    }

    affiliate_url = f"https://shopee.co.th/product/{parsed_url.path.split('/')[-2]}/{product_id}?{urllib.parse.urlencode(affiliate_params)}"

    # Create mouseover link
    mouseover_link = f"https://affiliate.shopee.co.th/offer/product_offer/{product_id}?trace={encoded_trace}"

    return affiliate_url, mouseover_link


@shopee.route('/analyze', methods=['POST', 'GET'])
def analyze_content():
    if request.method == 'POST':
        data = request.get_json()
        content = data.get('content', '')

        words = word_tokenize(content)

        related_item_ids = set()
        for word in words[:3]:
            clean_word = clean_filename(word)
            index_data = get_index_data('inverted_index', clean_word)
            related_item_ids.update(index_data)

        selected_item_ids = random.sample(list(related_item_ids), min(5, len(related_item_ids)))

        css = '''
            <style>
                .shopee-container {
                    display: flex;
                    flex-wrap: wrap;
                    justify-content: space-around;
                }
                .shopee-item {
                    width: 200px;
                    margin: 10px;
                    text-align: center;
                }
                .image-container {
                    position: relative;
                    width: 100%;
                }
                .shopee-item img {
                    max-width: 100%;
                    height: auto;
                }
                .shopee-price {
                    position: absolute;
                    bottom: 10px;
                    right: 10px;
                    background-color: rgba(255, 255, 255, 0.7);
                    padding: 5px;
                    border-radius: 5px;
                    font-weight: bold;
                    color: red;
                }
                .shopee-title {
                    margin-top: 5px;
                    font-size: 14px;
                }
            </style>
        '''

        html_output = css + '<div class="shopee-container">'
        for itemid in selected_item_ids:
            item_data = get_item_data(itemid)
            if item_data:
                price = item_data.get('price', 'N/A')
                affiliate_id = "15344040055"  # แทนที่ด้วย Affiliate ID ของคุณ
                product_link = item_data['product_link']
                affiliate_link, mouseover_link = create_affiliate_link(product_link, affiliate_id)

                html_output += f'''
                    <div class="shopee-item">
                        <a href="{affiliate_link}" target="_blank" data-mouseover="{mouseover_link}">
                            <div class="image-container">
                                <img src="{item_data['image_link']}" alt="{item_data['title']}">
                                <div class="shopee-price">{price}บาท</div>
                            </div>
                            <p class="shopee-title">{item_data['title']}</p>
                        </a>
                    </div>
                '''
        html_output += '</div>'

        # Add JavaScript to handle mouseover event
        html_output += '''
        <script>
        document.querySelectorAll('.shopee-item a').forEach(link => {
            link.addEventListener('mouseover', function() {
                this.setAttribute('href', this.getAttribute('data-mouseover'));
            });
            link.addEventListener('mouseout', function() {
                this.setAttribute('href', this.getAttribute('data-original-link'));
            });
            link.setAttribute('data-original-link', link.getAttribute('href'));
        });
        </script>
        '''

        return jsonify(html=html_output)
    else:
        return "Shopee analyze running"


def create_app():
    from flask import Flask
    app = Flask(__name__)
    app.register_blueprint(shopee, url_prefix='/shopee')
    return app


if __name__ == '__main__':
    app = create_app()
    app.run(host='0.0.0.0', port=5001)


Code เรียกใช้งาน API

.js สำหรับนำไปแปะ เพื่อนำ <h1 ยิงไป api เพื่อวิเคราะข้อมูลและส่งกลับมาว่าจะแสดงสินค้าอะไร

<script>
    async function fetchShopeeItems() {
        const heading = document.querySelector('h1').innerText;
        try {
            const response = await fetch('https://api.m1n.app:8443/shopee/analyze', {
                method: 'POST',
                headers: { 'Content-Type': 'application/json' },
                body: JSON.stringify({ content: heading })
            });
            if (response.ok) {
                const result = await response.json();
                document.getElementById('shopee-container').innerHTML = result.html;
            } else {
                console.error('Error fetching Shopee items:', response.statusText);
            }
        } catch (error) {
            console.error('Error:', error);
        }
    }

    document.addEventListener('DOMContentLoaded', fetchShopeeItems);
</script>


ส่วนของการแสดงโฆษณา

<div id="shopee-container"></div>


More Posts