วันนี้ได้ลองทำการทดสอบเล็กๆ มีคำถามอยู่ว่าถ้ามีสินค้า 1ล้านรายการหรือมากกว่า การเรียกใช้ได้เร็ว การค้ณหาข้อมูลต้องน้อยกว่าวินาที ที่นึกออกคงต้องทำ NoSQL วันนี้ก็เลยลองทำการทดสอบ เกี่ยวกับการทำ NoSQL และการนำไปใช้งาน ซึ่งสามารถนำมาอธิบายได้ดังนี้
1. การสร้างฐานข้อมูล NoSQL บน Object Storage:
- ใช้ Vultr Object Storage เป็นที่เก็บข้อมูล
- แปลงข้อมูลจากไฟล์ CSV เป็น JSON
- แบ่งข้อมูลเป็นไฟล์ย่อยเพื่อการจัดการที่มีประสิทธิภาพ
2. การสร้างดัชนี (Indexing):
- สร้างดัชนีแบบ Inverted Index สำหรับการค้นหาคำสำคัญ
- สร้าง Prefix Trie สำหรับการค้นหาแบบ autocomplete
- สร้างดัชนีตามหมวดหมู่สินค้า
3. การประมวลผลข้อมูล:
- ใช้ multi-threading เพื่อเพิ่มประสิทธิภาพในการประมวลผลไฟล์ขนาดใหญ่
- ทำการ tokenize ข้อความภาษาไทยและอังกฤษ
- ทำความสะอาดข้อมูลและจัดการกับอักขระพิเศษ
4. การจัดเก็บข้อมูลบน Object Storage:
- แยกเก็บข้อมูลสินค้าและดัชนีในโครงสร้างโฟลเดอร์ที่เหมาะสม
- ใช้การเข้ารหัสและการตั้งชื่อไฟล์ที่ปลอดภัย
5. การสร้าง API สำหรับการเข้าถึงข้อมูล:
- พัฒนา Flask API สำหรับการวิเคราะห์เนื้อหาและค้นหาสินค้าที่เกี่ยวข้อง
- ใช้ word tokenization เพื่อแยกคำสำคัญจากข้อความ input
- ค้นหาสินค้าที่เกี่ยวข้องโดยใช้ดัชนีที่สร้างไว้
6. การสร้างลิงก์ Affiliate:
- พัฒนาระบบสร้างลิงก์ Affiliate แบบไดนามิก
- รองรับการสร้างลิงก์ mouseover ที่มีการเข้ารหัสข้อมูล trace
- จัดการกับพารามิเตอร์การติดตามต่างๆ เช่น utm_source, utm_medium
7. การแสดงผลและ UI:
- สร้าง HTML output ที่แสดงสินค้าพร้อมลิงก์ Affiliate
- ใช้ CSS เพื่อจัดรูปแบบการแสดงผลให้น่าสนใจ
- เพิ่ม JavaScript เพื่อจัดการเหตุการณ์ mouseover บนลิงก์สินค้า
8. การปรับแต่งประสิทธิภาพ:
- ใช้ caching เพื่อลดการเรียกข้อมูลซ้ำ
- แบ่งการประมวลผลเป็นชุดเพื่อจัดการกับข้อมูลขนาดใหญ่
- ใช้การทำงานแบบ asynchronous เพื่อเพิ่มประสิทธิภาพ
9. ความปลอดภัยและการจัดการข้อมูล:
- ใช้การเข้ารหัสสำหรับข้อมูลที่ละเอียดอ่อน
- จัดการสิทธิ์การเข้าถึงไฟล์บน Object Storage
10. การนำไปใช้งาน:
- สามารถนำไปใช้ในระบบแนะนำสินค้า (Recommendation System)
- ใช้ในการสร้างระบบ Content Marketing ที่เชื่อมโยงกับสินค้า
- ประยุกต์ใช้ในระบบ Affiliate Marketing
จัดไปเบาๆ แยก CSV จากสินค้า 1 ล้านรายการ ให้เป็นไฟล์ย่อยก่อน
csv-path.py
import csv import os def split_csv(input_file, output_folder, rows_per_file=100000): if not os.path.exists(output_folder): os.makedirs(output_folder) with open(input_file, 'r', newline='', encoding='utf-8') as csvfile: reader = csv.reader(csvfile) header = next(reader) # อ่านส่วนหัวของไฟล์ file_number = 1 row_count = 0 current_writer = None current_file = None for row in reader: if row_count % rows_per_file == 0: if current_file: current_file.close() output_file = os.path.join(output_folder, f'products_{file_number:03d}.csv') current_file = open(output_file, 'w', newline='', encoding='utf-8') current_writer = csv.writer(current_file) current_writer.writerow(header) # เขียนส่วนหัวในทุกไฟล์ file_number += 1 current_writer.writerow(row) row_count += 1 if current_file: current_file.close() print(f"แบ่งไฟล์เสร็จสิ้น ได้ทั้งหมด {file_number - 1} ไฟล์") # ใช้งานฟังก์ชัน input_file = 'data.csv' # ชื่อไฟล์ CSV ขนาดใหญ่ของคุณ output_folder = 'split_products' # โฟลเดอร์ที่จะเก็บไฟล์ย่อย rows_per_file = 1000 # จำนวนแถวต่อไฟล์ย่อย split_csv(input_file, output_folder, rows_per_file)
แยกแล้วทำการประมวลผลเพื่อนำไปลง Object Storage และทำ ดัชนี (Indexing) จากที่แยกไฟล์ออกได้ 100ไฟล์
shopee-import-nosql.py
import csv import json import boto3 from botocore.config import Config from pythainlp import word_tokenize import nltk from nltk.tokenize import word_tokenize as en_word_tokenize import re import urllib.parse from concurrent.futures import ThreadPoolExecutor, as_completed import os nltk.download('punkt', quiet=True) # Vultr API credentials and settings ACCESS_KEY_ID = '' SECRET_ACCESS_KEY = '' BUCKET_NAME = 'nosql' ENDPOINT_URL = 'https://sgp1.vultrobjects.com' # Initialize the S3 resource with Vultr credentials s3 = boto3.resource('s3', endpoint_url=ENDPOINT_URL, aws_access_key_id=ACCESS_KEY_ID, aws_secret_access_key=SECRET_ACCESS_KEY, config=Config(signature_version='s3v4'), region_name='sgp1') # Global dictionaries for caching category_cache = {} inverted_index_cache = {} prefix_trie_cache = {} def is_thai(text): return any("\u0E00" <= c <= "\u0E7F" for c in text) def tokenize_text(text): if is_thai(text): return [word for word in word_tokenize(text, engine='newmm') if len(word) <= 20] else: return [word for word in en_word_tokenize(text) if len(word) <= 20] def clean_filename(filename): filename = urllib.parse.unquote(filename) filename = re.sub(r'[^\w\-ก-๙\._ ]', '_', filename) # อนุญาตอักษรภาษาไทย filename = re.sub(r'_+', '_', filename) return filename.strip('_') def update_index(index_type, key1, key2=None, key3=None, itemid=None): if index_type == 'categories': if key1 not in category_cache: category_cache[key1] = {} if key2 not in category_cache[key1]: category_cache[key1][key2] = {} if key3 not in category_cache[key1][key2]: category_cache[key1][key2][key3] = set() category_cache[key1][key2][key3].add(itemid) elif index_type == 'inverted_index': if key1 not in inverted_index_cache: inverted_index_cache[key1] = set() inverted_index_cache[key1].add(itemid) elif index_type == 'prefix_trie': if key1 not in prefix_trie_cache: prefix_trie_cache[key1] = set() prefix_trie_cache[key1].add(itemid) def process_csv_chunk(chunk): for row in chunk: try: item_filename = f'shopee/items/item_{row["itemid"]}.json' s3.Object(BUCKET_NAME, item_filename).put( Body=json.dumps(row, ensure_ascii=False), ACL='public-read', ContentType='application/json' ) update_index('categories', row['global_category1'], row['global_category2'], row['global_category3'], row['itemid']) words = tokenize_text(row['title']) for word in words[:3]: clean_word = clean_filename(word) update_index('prefix_trie', clean_word, itemid=row['itemid']) update_index('inverted_index', clean_word, itemid=row['itemid']) except Exception as e: print(f"Error processing row {row.get('itemid', 'unknown')}: {str(e)}") def process_csv_file(csv_file_path): chunk_size = 1000 with open(csv_file_path, 'r', encoding='utf-8') as csvfile: csvreader = csv.DictReader(csvfile) chunks = [] current_chunk = [] for i, row in enumerate(csvreader): current_chunk.append(row) if len(current_chunk) == chunk_size: chunks.append(current_chunk) current_chunk = [] if i % 1000 == 0: print(f"Read {i} rows from {csv_file_path}") if current_chunk: chunks.append(current_chunk) with ThreadPoolExecutor(max_workers=10) as executor: futures = [executor.submit(process_csv_chunk, chunk) for chunk in chunks] for future in as_completed(futures): try: future.result() except Exception as e: print(f"Error in thread: {str(e)}") # Upload indexes after processing each file upload_cached_indexes() print(f"Uploaded indexes for {csv_file_path}") def upload_cached_indexes(): for key, value in category_cache.items(): filename = f'shopee/indexes/categories_{clean_filename(key)}.json' s3.Object(BUCKET_NAME, filename).put( Body=json.dumps({k: {kk: list(vv) for kk, vv in v.items()} for k, v in value.items()}, ensure_ascii=False), ACL='public-read', ContentType='application/json' ) for key, value in inverted_index_cache.items(): filename = f'shopee/indexes/inverted_index_{clean_filename(key)}.json' s3.Object(BUCKET_NAME, filename).put( Body=json.dumps(list(value), ensure_ascii=False), ACL='public-read', ContentType='application/json' ) for key, value in prefix_trie_cache.items(): filename = f'shopee/indexes/prefix_trie_{clean_filename(key)}.json' s3.Object(BUCKET_NAME, filename).put( Body=json.dumps(list(value), ensure_ascii=False), ACL='public-read', ContentType='application/json' ) def ensure_folders_exist(): folders = ['shopee/indexes', 'shopee/items'] for folder in folders: s3.Object(BUCKET_NAME, f"{folder}/").put(Body='') if __name__ == "__main__": try: ensure_folders_exist() # สร้างรายการไฟล์ CSV ตั้งแต่ 001 ถึง 1000 csv_files = [f'products_{i:03d}.csv' for i in range(10, 1001)] for csv_file in csv_files: csv_file_path = os.path.join('split_products', csv_file) if os.path.exists(csv_file_path): print(f"Processing {csv_file}") process_csv_file(csv_file_path) else: print(f"File {csv_file} not found. Skipping.") print("Final upload of cached indexes") upload_cached_indexes() print("Process completed successfully.") except Exception as e: print(f"An error occurred: {str(e)}")
shopee.py
from flask import Blueprint, request, jsonify from pythainlp import word_tokenize import boto3 from botocore.config import Config import json import re import random import urllib.parse import base64 import uuid shopee = Blueprint('shopee', __name__) # Vultr API credentials and settings ACCESS_KEY_ID = '' SECRET_ACCESS_KEY = '' BUCKET_NAME = 'nosql' ENDPOINT_URL = 'https://sgp1.vultrobjects.com' # Initialize the S3 client with Vultr credentials s3 = boto3.client('s3', endpoint_url=ENDPOINT_URL, aws_access_key_id=ACCESS_KEY_ID, aws_secret_access_key=SECRET_ACCESS_KEY, config=Config(signature_version='s3v4'), region_name='sgp1') def clean_filename(filename): filename = re.sub(r'[^\w\-ก-๙\._ ]', '_', filename) filename = re.sub(r'_+', '_', filename) return filename.strip('_') def get_index_data(index_type, key): try: filename = f'shopee/indexes/{index_type}_{clean_filename(key)}.json' response = s3.get_object(Bucket=BUCKET_NAME, Key=filename) return json.loads(response['Body'].read().decode('utf-8')) except Exception as e: print(f"Error getting index data: {str(e)}") return {} def get_item_data(itemid): try: filename = f'shopee/items/item_{itemid}.json' response = s3.get_object(Bucket=BUCKET_NAME, Key=filename) return json.loads(response['Body'].read().decode('utf-8')) except Exception as e: print(f"Error getting item data: {str(e)}") return None def create_affiliate_link(product_link, affiliate_id): parsed_url = urllib.parse.urlparse(product_link) query_params = urllib.parse.parse_qs(parsed_url.query) # Generate dynamic tracking IDs uls_trackid = f"507{uuid.uuid4().hex[:8]}0109" utm_term = f"bf1{uuid.uuid4().hex[:10]}" # Create trace data trace_data = { "trace_id": f"0.{uuid.uuid4().hex[:12]}.104", "pre_trace_id": f"0.{uuid.uuid4().hex[:12]}.600", "list_type": 104, "root_trace_id": f"0.{uuid.uuid4().hex[:12]}.600", "root_list_type": 600, "depth": 1 } encoded_trace = base64.urlsafe_b64encode(json.dumps(trace_data).encode()).decode() # Extract product ID from the URL product_id = parsed_url.path.split('/')[-1] # Create affiliate link affiliate_params = { 'uls_trackid': uls_trackid, 'utm_campaign': '-', 'utm_content': '----', 'utm_medium': 'affiliates', 'utm_source': f'an_{affiliate_id}', 'utm_term': utm_term } affiliate_url = f"https://shopee.co.th/product/{parsed_url.path.split('/')[-2]}/{product_id}?{urllib.parse.urlencode(affiliate_params)}" # Create mouseover link mouseover_link = f"https://affiliate.shopee.co.th/offer/product_offer/{product_id}?trace={encoded_trace}" return affiliate_url, mouseover_link @shopee.route('/analyze', methods=['POST', 'GET']) def analyze_content(): if request.method == 'POST': data = request.get_json() content = data.get('content', '') words = word_tokenize(content) related_item_ids = set() for word in words[:3]: clean_word = clean_filename(word) index_data = get_index_data('inverted_index', clean_word) related_item_ids.update(index_data) selected_item_ids = random.sample(list(related_item_ids), min(5, len(related_item_ids))) css = ''' <style> .shopee-container { display: flex; flex-wrap: wrap; justify-content: space-around; } .shopee-item { width: 200px; margin: 10px; text-align: center; } .image-container { position: relative; width: 100%; } .shopee-item img { max-width: 100%; height: auto; } .shopee-price { position: absolute; bottom: 10px; right: 10px; background-color: rgba(255, 255, 255, 0.7); padding: 5px; border-radius: 5px; font-weight: bold; color: red; } .shopee-title { margin-top: 5px; font-size: 14px; } </style> ''' html_output = css + '<div class="shopee-container">' for itemid in selected_item_ids: item_data = get_item_data(itemid) if item_data: price = item_data.get('price', 'N/A') affiliate_id = "15344040055" # แทนที่ด้วย Affiliate ID ของคุณ product_link = item_data['product_link'] affiliate_link, mouseover_link = create_affiliate_link(product_link, affiliate_id) html_output += f''' <div class="shopee-item"> <a href="{affiliate_link}" target="_blank" data-mouseover="{mouseover_link}"> <div class="image-container"> <img src="{item_data['image_link']}" alt="{item_data['title']}"> <div class="shopee-price">{price}บาท</div> </div> <p class="shopee-title">{item_data['title']}</p> </a> </div> ''' html_output += '</div>' # Add JavaScript to handle mouseover event html_output += ''' <script> document.querySelectorAll('.shopee-item a').forEach(link => { link.addEventListener('mouseover', function() { this.setAttribute('href', this.getAttribute('data-mouseover')); }); link.addEventListener('mouseout', function() { this.setAttribute('href', this.getAttribute('data-original-link')); }); link.setAttribute('data-original-link', link.getAttribute('href')); }); </script> ''' return jsonify(html=html_output) else: return "Shopee analyze running" def create_app(): from flask import Flask app = Flask(__name__) app.register_blueprint(shopee, url_prefix='/shopee') return app if __name__ == '__main__': app = create_app() app.run(host='0.0.0.0', port=5001)
.js สำหรับนำไปแปะ เพื่อนำ <h1 ยิงไป api เพื่อวิเคราะข้อมูลและส่งกลับมาว่าจะแสดงสินค้าอะไร
<script> async function fetchShopeeItems() { const heading = document.querySelector('h1').innerText; try { const response = await fetch('https://api.m1n.app:8443/shopee/analyze', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ content: heading }) }); if (response.ok) { const result = await response.json(); document.getElementById('shopee-container').innerHTML = result.html; } else { console.error('Error fetching Shopee items:', response.statusText); } } catch (error) { console.error('Error:', error); } } document.addEventListener('DOMContentLoaded', fetchShopeeItems); </script>
ส่วนของการแสดงโฆษณา
<div id="shopee-container"></div>