From 95a95cb4793927f5e8ee8ac1bad950733005d273 Mon Sep 17 00:00:00 2001 From: Rene Cannao Date: Tue, 23 Dec 2025 19:23:43 +0000 Subject: [PATCH 1/6] Add script to copy StackExchange Posts table from MySQL to SQLite3 server This Python script uses mysql.connector to copy the entire Posts table (248,905 rows) from MySQL to the ProxySQL SQLite3 server. The script handles schema conversion, proper escaping, and provides progress reporting. Tested with full copy taking 30 seconds at ~8,300 rows/sec. --- ...py_stackexchange_Posts_mysql_to_sqlite3.py | 183 ++++++++++++++++++ 1 file changed, 183 insertions(+) create mode 100755 scripts/copy_stackexchange_Posts_mysql_to_sqlite3.py diff --git a/scripts/copy_stackexchange_Posts_mysql_to_sqlite3.py b/scripts/copy_stackexchange_Posts_mysql_to_sqlite3.py new file mode 100755 index 0000000000..72e9341e6f --- /dev/null +++ b/scripts/copy_stackexchange_Posts_mysql_to_sqlite3.py @@ -0,0 +1,183 @@ +#!/usr/bin/env python3 +""" +Copy Posts table from MySQL to ProxySQL SQLite3 server. +Uses Python MySQL connectors for direct database access. +""" + +import mysql.connector +import sys +import time + +# Configuration +SOURCE_CONFIG = { + "host": "127.0.0.1", + "port": 3306, + "user": "stackexchange", + "password": "my-password", + "database": "stackexchange", + "use_pure": True, + "ssl_disabled": True +} + +DEST_CONFIG = { + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "root", + "database": "main", + "use_pure": True, + "ssl_disabled": True +} + +TABLE_NAME = "Posts" +LIMIT = 0 # 0 for all rows, otherwise limit for testing +BATCH_SIZE = 5000 # Larger batch for full copy +CLEAR_TABLE_FIRST = True # Delete existing data before copying + +COLUMNS = [ + "SiteId", "Id", "PostTypeId", "AcceptedAnswerId", "ParentId", + "CreationDate", "DeletionDate", "Score", "ViewCount", "Body", + "OwnerUserId", "OwnerDisplayName", "LastEditorUserId", "LastEditorDisplayName", + "LastEditDate", "LastActivityDate", "Title", "Tags", "AnswerCount", + "CommentCount", "FavoriteCount", "ClosedDate", "CommunityOwnedDate", "ContentLicense" +] + +def escape_sql_value(value): + """Escape a value for SQL insertion.""" + if value is None: + return "NULL" + # Convert to string + s = str(value) + # Escape single quotes by doubling + escaped = s.replace("'", "''") + return f"'{escaped}'" + +def generate_insert(row): + """Generate INSERT statement for a single row.""" + values_str = ", ".join(escape_sql_value(v) for v in row) + columns_str = ", ".join(COLUMNS) + return f"INSERT INTO {TABLE_NAME} ({columns_str}) VALUES ({values_str})" + +def main(): + print(f"Copying {TABLE_NAME} from MySQL to SQLite3 server...") + print(f"Source: {SOURCE_CONFIG['host']}:{SOURCE_CONFIG['port']}") + print(f"Destination: {DEST_CONFIG['host']}:{DEST_CONFIG['port']}") + if LIMIT > 0: + print(f"Limit: {LIMIT} rows") + else: + print(f"Copying all rows") + + # Connect to source (MySQL) + try: + source_conn = mysql.connector.connect(**SOURCE_CONFIG) + source_cursor = source_conn.cursor() + print("✓ Connected to MySQL source") + except Exception as e: + print(f"✗ Failed to connect to source MySQL: {e}") + sys.exit(1) + + # Connect to destination (ProxySQL SQLite3 server) + try: + dest_conn = mysql.connector.connect(**DEST_CONFIG) + dest_cursor = dest_conn.cursor() + print("✓ Connected to SQLite3 server destination") + except Exception as e: + print(f"✗ Failed to connect to destination SQLite3 server: {e}") + source_conn.close() + sys.exit(1) + + try: + # Clear destination table if requested + if CLEAR_TABLE_FIRST: + print("Clearing destination table...") + dest_cursor.execute(f"DELETE FROM {TABLE_NAME}") + dest_conn.commit() + print("✓ Destination table cleared") + + # Build query with optional LIMIT + query = f"SELECT * FROM {TABLE_NAME}" + if LIMIT > 0: + query += f" LIMIT {LIMIT}" + + print(f"Executing query: {query}") + source_cursor.execute(query) + + rows = 0 + errors = 0 + start = time.time() + last_report = start + + # Fetch and insert rows + print("Starting copy...") + while True: + batch = source_cursor.fetchmany(BATCH_SIZE) + if not batch: + break + + for row in batch: + try: + insert_sql = generate_insert(row) + dest_cursor.execute(insert_sql) + rows += 1 + except Exception as e: + errors += 1 + if errors <= 3: + print(f"Error inserting row {rows+1}: {e}") + if errors == 1: + print(f" Sample INSERT (first 300 chars): {insert_sql[:300]}...") + + # Commit batch + dest_conn.commit() + + # Progress reporting every 1000 rows or 5 seconds + now = time.time() + if rows % 1000 == 0 or (now - last_report) >= 5: + elapsed = now - start + rate = rows / elapsed if elapsed > 0 else 0 + print(f" Processed {rows} rows ({rate:.1f} rows/sec)") + last_report = now + + # Final commit + dest_conn.commit() + + elapsed = time.time() - start + print(f"\n✓ Copy completed:") + print(f" Rows copied: {rows}") + print(f" Errors: {errors}") + print(f" Time: {elapsed:.1f}s") + if elapsed > 0: + print(f" Rate: {rows/elapsed:.1f} rows/sec") + + # Verify counts if no errors + if errors == 0: + # Get source count + if LIMIT > 0: + expected = min(LIMIT, rows) + else: + source_cursor.execute(f"SELECT COUNT(*) FROM {TABLE_NAME}") + expected = source_cursor.fetchone()[0] + + dest_cursor.execute(f"SELECT COUNT(*) FROM {TABLE_NAME}") + actual = dest_cursor.fetchone()[0] + + print(f"\n✓ Verification:") + print(f" Expected rows: {expected}") + print(f" Actual rows: {actual}") + if expected == actual: + print(f" ✓ Counts match!") + else: + print(f" ✗ Count mismatch!") + + except Exception as e: + print(f"\n✗ Error during copy: {e}") + sys.exit(1) + finally: + # Cleanup + source_cursor.close() + source_conn.close() + dest_cursor.close() + dest_conn.close() + print("\nConnections closed.") + +if __name__ == "__main__": + main() \ No newline at end of file From 8e8363576021daa322aac2f9f2da9df55d4ec140 Mon Sep 17 00:00:00 2001 From: Rene Cannao Date: Tue, 23 Dec 2025 19:42:57 +0000 Subject: [PATCH 2/6] Add Posts embeddings setup documentation with optimized batch processing This documentation provides step-by-step instructions for setting up virtual tables for Posts embeddings using sqlite-rembed and sqlite-vec. Key features: - Virtual table creation for 768-dimensional embeddings - API client configuration for embedding generation - Optimized batch query using LEFT JOIN to process unembedded rows - Batch processing script with progress tracking - Similarity search examples - Performance considerations and troubleshooting The batch query uses a LEFT JOIN pattern to find unprocessed rows: INSERT ... SELECT ... FROM Posts LEFT JOIN Posts_embeddings WHERE Posts_embeddings.rowid IS NULL LIMIT 1000; This approach eliminates the need for tracking rowid ranges and can be run repeatedly until all 248,905 Posts have embeddings generated. --- doc/posts-embeddings-setup.md | 295 ++++++++++++++++++++++++++++++++++ 1 file changed, 295 insertions(+) create mode 100644 doc/posts-embeddings-setup.md diff --git a/doc/posts-embeddings-setup.md b/doc/posts-embeddings-setup.md new file mode 100644 index 0000000000..23d2d7962e --- /dev/null +++ b/doc/posts-embeddings-setup.md @@ -0,0 +1,295 @@ +# Posts Table Embeddings Setup Guide + +This guide explains how to set up and populate virtual tables for storing and searching embeddings of the Posts table content using sqlite-rembed and sqlite-vec extensions in ProxySQL. + +## Prerequisites + +1. **ProxySQL** running with SQLite3 backend enabled (`--sqlite3-server` flag) +2. **Posts table** copied from MySQL to SQLite3 server (248,905 rows) + - Use `scripts/copy_stackexchange_Posts_mysql_to_sqlite3.py` if not already copied +3. **Valid API credentials** for embedding generation +4. **Network access** to embedding API endpoint + +## Setup Steps + +### Step 1: Create Virtual Vector Table + +Create a virtual table for storing 768-dimensional embeddings (matching nomic-embed-text-v1.5 model output): + +```sql +-- Create virtual vector table for Posts embeddings +CREATE VIRTUAL TABLE Posts_embeddings USING vec0( + embedding float[768] +); +``` + +### Step 2: Configure API Client + +Configure an embedding API client using the `temp.rembed_clients` virtual table: + +```sql +-- Configure embedding API client +-- Replace YOUR_API_KEY with actual API key +INSERT INTO temp.rembed_clients(name, options) VALUES + ('posts-embed-client', + rembed_client_options( + 'format', 'openai', + 'url', 'https://api.synthetic.new/openai/v1/embeddings', + 'key', 'YOUR_API_KEY', + 'model', 'hf:nomic-ai/nomic-embed-text-v1.5' + ) + ); +``` + +### Step 3: Generate and Insert Embeddings + +#### For Testing (First 100 rows) + +```sql +-- Generate embeddings for first 100 Posts +INSERT OR REPLACE INTO Posts_embeddings(rowid, embedding) +SELECT rowid, rembed('posts-embed-client', + COALESCE(Title || ' ', '') || Body) as embedding +FROM Posts +LIMIT 100; +``` + +#### For Full Table (Batch Processing) + +Use this optimized batch query that processes unembedded rows without requiring rowid tracking: + +```sql +-- Batch process unembedded rows (processes ~1000 rows at a time) +INSERT OR REPLACE INTO Posts_embeddings(rowid, embedding) +SELECT Posts.rowid, rembed('posts-embed-client', + COALESCE(Posts.Title || ' ', '') || Posts.Body) as embedding +FROM Posts +LEFT JOIN Posts_embeddings ON Posts.rowid = Posts_embeddings.rowid +WHERE Posts_embeddings.rowid IS NULL +LIMIT 1000; +``` + +**Key features of this batch query:** +- Uses `LEFT JOIN` to find Posts without existing embeddings +- `WHERE Posts_embeddings.rowid IS NULL` filters for unprocessed rows +- `LIMIT 1000` controls batch size +- Can be run repeatedly until all rows are processed +- No need to track which rowids have been processed + +### Step 4: Verify Embeddings + +```sql +-- Check total embeddings count +SELECT COUNT(*) as total_embeddings FROM Posts_embeddings; + +-- Check embedding size (should be 3072 bytes: 768 dimensions × 4 bytes) +SELECT rowid, length(embedding) as embedding_size_bytes +FROM Posts_embeddings LIMIT 3; + +-- Check percentage of Posts with embeddings +SELECT + (SELECT COUNT(*) FROM Posts_embeddings) as with_embeddings, + (SELECT COUNT(*) FROM Posts) as total_posts, + ROUND( + (SELECT COUNT(*) FROM Posts_embeddings) * 100.0 / + (SELECT COUNT(*) FROM Posts), 2 + ) as percentage_complete; +``` + +## Batch Processing Strategy for 248,905 Rows + +### Recommended Approach + +1. **Run the batch query repeatedly** until all rows have embeddings +2. **Add delays between batches** to avoid API rate limiting +3. **Monitor progress** using the verification queries above + +### Example Shell Script for Batch Processing + +```bash +#!/bin/bash +# process_posts_embeddings.sh + +PROXYSQL_HOST="127.0.0.1" +PROXYSQL_PORT="6030" +MYSQL_USER="root" +MYSQL_PASS="root" +BATCH_SIZE=1000 +DELAY_SECONDS=5 + +echo "Starting Posts embeddings generation..." + +while true; do + # Execute batch query + mysql -h "$PROXYSQL_HOST" -P "$PROXYSQL_PORT" -u "$MYSQL_USER" -p"$MYSQL_PASS" << EOF + INSERT OR REPLACE INTO Posts_embeddings(rowid, embedding) + SELECT Posts.rowid, rembed('posts-embed-client', + COALESCE(Posts.Title || ' ', '') || Posts.Body) as embedding + FROM Posts + LEFT JOIN Posts_embeddings ON Posts.rowid = Posts_embeddings.rowid + WHERE Posts_embeddings.rowid IS NULL + LIMIT $BATCH_SIZE; +EOF + + # Check if any rows were processed + PROCESSED=$(mysql -h "$PROXYSQL_HOST" -P "$PROXYSQL_PORT" -u "$MYSQL_USER" -p"$MYSQL_PASS" -s -N << EOF + SELECT COUNT(*) FROM Posts_embeddings; +EOF) + + TOTAL=$(mysql -h "$PROXYSQL_HOST" -P "$PROXYSQL_PORT" -u "$MYSQL_USER" -p"$MYSQL_PASS" -s -N << EOF + SELECT COUNT(*) FROM Posts; +EOF) + + PERCENTAGE=$(echo "scale=2; $PROCESSED * 100 / $TOTAL" | bc) + echo "Processed: $PROCESSED/$TOTAL rows ($PERCENTAGE%)" + + # Break if all rows processed + if [ "$PROCESSED" -eq "$TOTAL" ]; then + echo "All rows processed!" + break + fi + + # Wait before next batch + echo "Waiting $DELAY_SECONDS seconds before next batch..." + sleep $DELAY_SECONDS +done +``` + +## Similarity Search Examples + +Once embeddings are generated, you can perform semantic search: + +### Example 1: Find Similar Posts + +```sql +-- Find Posts similar to a query about databases +SELECT p.SiteId, p.Id as PostId, p.Title, e.distance, + substr(p.Body, 1, 100) as body_preview +FROM ( + SELECT rowid, distance + FROM Posts_embeddings + WHERE embedding MATCH rembed('posts-embed-client', + 'database systems and SQL queries') + LIMIT 5 +) e +JOIN Posts p ON e.rowid = p.rowid +ORDER BY e.distance; +``` + +### Example 2: Find Posts Similar to Specific Post + +```sql +-- Find Posts similar to Post with ID 1 +SELECT p2.SiteId, p2.Id as PostId, p2.Title, e.distance, + substr(p2.Body, 1, 100) as body_preview +FROM ( + SELECT rowid, distance + FROM Posts_embeddings + WHERE embedding MATCH ( + SELECT embedding + FROM Posts_embeddings + WHERE rowid = 1 -- Change to target Post rowid + ) + AND rowid != 1 + LIMIT 5 +) e +JOIN Posts p2 ON e.rowid = p2.rowid +ORDER BY e.distance; +``` + +## Performance Considerations + +1. **API Rate Limiting**: The `rembed()` function makes HTTP requests to the API + - Batch size of 1000 with 5-second delays is conservative + - Adjust based on API rate limits + - Monitor API usage and costs + +2. **Embedding Storage**: + - Each embedding: 768 dimensions × 4 bytes = 3,072 bytes + - Full table (248,905 rows): ~765 MB + - Ensure sufficient disk space + +3. **Search Performance**: + - `vec0` virtual tables use approximate nearest neighbor search + - Performance scales with number of vectors and dimensions + - Use `LIMIT` clauses to control result size + +## Troubleshooting + +### Common Issues + +1. **API Connection Errors** + - Verify API key is valid and has quota + - Check network connectivity to API endpoint + - Confirm API endpoint URL is correct + +2. **Embedding Generation Failures** + - Check `temp.rembed_clients` configuration + - Verify client name matches in `rembed()` calls + - Test with simple text first: `SELECT rembed('posts-embed-client', 'test');` + +3. **Batch Processing Stalls** + - Check if API rate limits are being hit + - Increase delay between batches + - Reduce batch size + +4. **Memory Issues** + - Large batches may consume significant memory + - Reduce batch size if encountering memory errors + - Monitor ProxySQL memory usage + +### Verification Queries + +```sql +-- Check API client configuration +SELECT name, json_extract(options, '$.format') as format, + json_extract(options, '$.model') as model +FROM temp.rembed_clients; + +-- Test embedding generation +SELECT length(rembed('posts-embed-client', 'test text')) as test_embedding_size; + +-- Check for embedding generation errors +SELECT rowid FROM Posts_embeddings WHERE length(embedding) != 3072; +``` + +## Maintenance + +### Adding New Posts + +When new Posts are added to the table: + +```sql +-- Generate embeddings for new Posts +INSERT OR REPLACE INTO Posts_embeddings(rowid, embedding) +SELECT Posts.rowid, rembed('posts-embed-client', + COALESCE(Posts.Title || ' ', '') || Posts.Body) as embedding +FROM Posts +LEFT JOIN Posts_embeddings ON Posts.rowid = Posts_embeddings.rowid +WHERE Posts_embeddings.rowid IS NULL; +``` + +### Recreating Virtual Table + +If you need to recreate the virtual table: + +```sql +-- Drop existing table +DROP TABLE IF EXISTS Posts_embeddings; + +-- Recreate with same schema +CREATE VIRTUAL TABLE Posts_embeddings USING vec0( + embedding float[768] +); +``` + +## Related Resources + +1. [sqlite-rembed Integration Documentation](./sqlite-rembed-integration.md) +2. [SQLite3 Server Documentation](./SQLite3-Server.md) +3. [Vector Search Testing](../doc/vector-search-test/README.md) +4. [Copy Script](../scripts/copy_stackexchange_Posts_mysql_to_sqlite3.py) + +--- + +*Last Updated: $(date)* \ No newline at end of file From 36a59f3f56e7e3af77689f6d995d5476220be8b0 Mon Sep 17 00:00:00 2001 From: Rene Cannao Date: Wed, 24 Dec 2025 03:59:35 +0000 Subject: [PATCH 3/6] Add Posts embeddings processing script with exponential backoff New script: scripts/process_posts_embeddings.py Features: - Connects to SQLite3 server via MySQL connector with configurable credentials - Configures API client using API_KEY environment variable (fails if not set) - Processes unembedded Posts rows in configurable batch sizes (default: 10) - Uses LEFT JOIN WHERE IS NULL pattern to track unprocessed rows - Implements exponential backoff for retry failures with 5-minute maximum cap - Shows progress: remaining rows, processed count, percentage complete - Fails if Posts_embeddings table doesn't exist (no automatic creation) - Handles concurrent processing race conditions with small delays Script prerequisites: 1. Posts table must exist (copied from MySQL) 2. Posts_embeddings virtual table must exist: CREATE VIRTUAL TABLE Posts_embeddings USING vec0(embedding float[768]); Backoff behavior: - Default retry delay: 5 seconds - Exponential increase: 5s, 10s, 20s, 40s, ... up to 300s maximum - Resets on any successful operation (even if 0 rows processed) --- scripts/process_posts_embeddings.py | 290 ++++++++++++++++++++++++++++ 1 file changed, 290 insertions(+) create mode 100755 scripts/process_posts_embeddings.py diff --git a/scripts/process_posts_embeddings.py b/scripts/process_posts_embeddings.py new file mode 100755 index 0000000000..57fdda8071 --- /dev/null +++ b/scripts/process_posts_embeddings.py @@ -0,0 +1,290 @@ +#!/usr/bin/env python3 +""" +Process Posts table embeddings using sqlite-rembed in ProxySQL SQLite3 server. + +Connects to SQLite3 server via MySQL connector, configures API client, +and processes unembedded Posts rows in batches of 10. + +Prerequisites: +1. Posts table must exist (copied from MySQL) +2. Posts_embeddings virtual table must exist: + CREATE VIRTUAL TABLE Posts_embeddings USING vec0(embedding float[768]); + +Environment variable API_KEY must be set for API authentication. +If Posts_embeddings table doesn't exist, the script will fail. +""" + +import os +import sys +import time +import argparse +import mysql.connector +from mysql.connector import Error + +def parse_args(): + """Parse command line arguments.""" + parser = argparse.ArgumentParser( + description='Process Posts table embeddings in ProxySQL SQLite3 server' + ) + parser.add_argument('--host', default='127.0.0.1', + help='ProxySQL SQLite3 server host (default: 127.0.0.1)') + parser.add_argument('--port', type=int, default=6030, + help='ProxySQL SQLite3 server port (default: 6030)') + parser.add_argument('--user', default='root', + help='Database user (default: root)') + parser.add_argument('--password', default='root', + help='Database password (default: root)') + parser.add_argument('--database', default='main', + help='Database name (default: main)') + parser.add_argument('--client-name', default='posts-embed-client', + help='rembed client name (default: posts-embed-client)') + parser.add_argument('--api-format', default='openai', + help='API format (default: openai)') + parser.add_argument('--api-url', default='https://api.synthetic.new/openai/v1/embeddings', + help='API endpoint URL') + parser.add_argument('--api-model', default='hf:nomic-ai/nomic-embed-text-v1.5', + help='Embedding model') + parser.add_argument('--batch-size', type=int, default=10, + help='Batch size for embedding generation (default: 10)') + parser.add_argument('--retry-delay', type=int, default=5, + help='Delay in seconds on error (default: 5)') + + return parser.parse_args() + +def check_env(): + """Check required environment variables.""" + api_key = os.getenv('API_KEY') + if not api_key: + print("ERROR: API_KEY environment variable must be set") + print("Usage: export API_KEY='your-api-key'") + sys.exit(1) + return api_key + +def connect_db(args): + """Connect to SQLite3 server using MySQL connector.""" + try: + conn = mysql.connector.connect( + host=args.host, + port=args.port, + user=args.user, + password=args.password, + database=args.database, + use_pure=True, + ssl_disabled=True + ) + return conn + except Error as e: + print(f"ERROR: Failed to connect to database: {e}") + sys.exit(1) + +def configure_client(conn, args, api_key): + """Configure rembed API client.""" + cursor = conn.cursor() + + insert_sql = f""" + INSERT INTO temp.rembed_clients(name, options) VALUES + ( + '{args.client_name}', + rembed_client_options( + 'format', '{args.api_format}', + 'url', '{args.api_url}', + 'key', '{api_key}', + 'model', '{args.api_model}' + ) + ); + """ + + try: + cursor.execute(insert_sql) + conn.commit() + print(f"✓ Configured API client '{args.client_name}'") + except Error as e: + print(f"ERROR: Failed to configure API client: {e}") + print(f"SQL: {insert_sql[:200]}...") + cursor.close() + sys.exit(1) + + cursor.close() + + +def get_remaining_count(conn): + """Get count of Posts without embeddings.""" + cursor = conn.cursor() + + count_sql = """ + SELECT COUNT(*) + FROM Posts + LEFT JOIN Posts_embeddings ON Posts.rowid = Posts_embeddings.rowid + WHERE Posts_embeddings.rowid IS NULL; + """ + + try: + cursor.execute(count_sql) + result = cursor.fetchone() + if result and result[0] is not None: + remaining = int(result[0]) + else: + remaining = 0 + cursor.close() + return remaining + except Error as e: + print(f"ERROR: Failed to count remaining rows: {e}") + cursor.close() + raise + +def get_total_posts(conn): + """Get total number of Posts.""" + cursor = conn.cursor() + + try: + cursor.execute("SELECT COUNT(*) FROM Posts;") + result = cursor.fetchone() + if result and result[0] is not None: + total = int(result[0]) + else: + total = 0 + cursor.close() + return total + except Error as e: + print(f"ERROR: Failed to count total Posts: {e}") + cursor.close() + raise + +def process_batch(conn, args): + """Process a batch of unembedded Posts.""" + cursor = conn.cursor() + + insert_sql = f""" + INSERT OR REPLACE INTO Posts_embeddings(rowid, embedding) + SELECT Posts.rowid, rembed('{args.client_name}', + COALESCE(Posts.Title || ' ', '') || Posts.Body) as embedding + FROM Posts + LEFT JOIN Posts_embeddings ON Posts.rowid = Posts_embeddings.rowid + WHERE Posts_embeddings.rowid IS NULL + LIMIT {args.batch_size}; + """ + + try: + cursor.execute(insert_sql) + conn.commit() + processed = cursor.rowcount + cursor.close() + return processed, None + except Error as e: + cursor.close() + return 0, str(e) + +def main(): + """Main processing loop.""" + args = parse_args() + api_key = check_env() + + print("=" * 60) + print("Posts Table Embeddings Processor") + print("=" * 60) + print(f"Host: {args.host}:{args.port}") + print(f"Database: {args.database}") + print(f"API Client: {args.client_name}") + print(f"Batch Size: {args.batch_size}") + print(f"API URL: {args.api_url}") + print(f"Model: {args.api_model}") + print("=" * 60) + + # Connect to database + conn = connect_db(args) + + # Configure API client + configure_client(conn, args, api_key) + + # Get initial counts + try: + total_posts = get_total_posts(conn) + remaining = get_remaining_count(conn) + processed = total_posts - remaining + + print(f"\nInitial status:") + print(f" Total Posts: {total_posts}") + print(f" Already embedded: {processed}") + print(f" Remaining: {remaining}") + print("-" * 40) + except Error as e: + print(f"ERROR: Failed to get initial counts: {e}") + conn.close() + sys.exit(1) + + if remaining == 0: + print("✓ All Posts already have embeddings. Nothing to do.") + conn.close() + sys.exit(0) + + # Main processing loop + iteration = 0 + total_processed = processed + consecutive_failures = 0 + MAX_BACKOFF_SECONDS = 300 # 5 minutes maximum backoff + + while True: + iteration += 1 + + # Get current remaining count + try: + remaining = get_remaining_count(conn) + except Error as e: + print(f"ERROR: Failed to get remaining count: {e}") + conn.close() + sys.exit(1) + + if remaining == 0: + print(f"\n✓ All {total_posts} Posts have embeddings!") + break + + # Show progress + if total_posts > 0: + progress_percent = (total_processed / total_posts) * 100 + progress_str = f" ({progress_percent:.1f}%)" + else: + progress_str = "" + print(f"\nIteration {iteration}:") + print(f" Remaining: {remaining}") + print(f" Processed: {total_processed}/{total_posts}{progress_str}") + + # Process batch + processed_count, error = process_batch(conn, args) + + if error: + consecutive_failures += 1 + backoff_delay = min(args.retry_delay * (2 ** (consecutive_failures - 1)), MAX_BACKOFF_SECONDS) + print(f" ✗ Batch failed: {error}") + print(f" Consecutive failures: {consecutive_failures}") + print(f" Waiting {backoff_delay} seconds before retry...") + time.sleep(backoff_delay) + continue + + # Reset consecutive failures on any successful operation (even if no rows processed) + consecutive_failures = 0 + + if processed_count > 0: + total_processed += processed_count + print(f" ✓ Processed {processed_count} rows") + # Continue immediately (no delay on success) + else: + print(f" ⓘ No rows processed (possibly concurrent process?)") + # Small delay if no rows processed (could be race condition) + time.sleep(1) + + # Final summary + print("\n" + "=" * 60) + print("Processing Complete!") + print(f"Total Posts: {total_posts}") + print(f"Total with embeddings: {total_processed}") + if total_posts > 0: + success_percent = (total_processed / total_posts) * 100 + print(f"Success rate: {success_percent:.1f}%") + else: + print("Success rate: N/A (no posts)") + print("=" * 60) + + conn.close() + +if __name__ == "__main__": + main() \ No newline at end of file From ffdb334dc30929103603f1bbdb02cdeed8f73373 Mon Sep 17 00:00:00 2001 From: Rene Cannao Date: Wed, 24 Dec 2025 05:41:46 +0000 Subject: [PATCH 4/6] Add WHERE filters to prevent empty input errors and fix SQL syntax Changes: - Filter Posts by PostTypeId IN (1,2) (Questions and Answers) - Filter by minimum text length > 30 characters (Title + Body) - Update get_total_posts to count only eligible posts for accurate progress - Fix SQL syntax error in process_batch WHERE clause - Update documentation with filter details Rationale: - Empty or very short text causes embedding generation failures - PostTypeId 1,2 are most relevant content (Questions and Answers) - Ensures consistent counting between total, remaining, and processed --- scripts/process_posts_embeddings.py | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/scripts/process_posts_embeddings.py b/scripts/process_posts_embeddings.py index 57fdda8071..c736588b5f 100755 --- a/scripts/process_posts_embeddings.py +++ b/scripts/process_posts_embeddings.py @@ -5,6 +5,10 @@ Connects to SQLite3 server via MySQL connector, configures API client, and processes unembedded Posts rows in batches of 10. +Filters applied: +- Only PostTypeId IN (1,2) (Questions and Answers) +- Minimum text length > 30 characters (Title + Body) + Prerequisites: 1. Posts table must exist (copied from MySQL) 2. Posts_embeddings virtual table must exist: @@ -115,7 +119,9 @@ def get_remaining_count(conn): SELECT COUNT(*) FROM Posts LEFT JOIN Posts_embeddings ON Posts.rowid = Posts_embeddings.rowid - WHERE Posts_embeddings.rowid IS NULL; + WHERE Posts.PostTypeId IN (1,2) + AND LENGTH(COALESCE(Posts.Title || '', '') || Posts.Body) > 30 + AND Posts_embeddings.rowid IS NULL; """ try: @@ -133,11 +139,16 @@ def get_remaining_count(conn): raise def get_total_posts(conn): - """Get total number of Posts.""" + """Get total number of eligible Posts (PostTypeId 1,2 with text length > 30).""" cursor = conn.cursor() try: - cursor.execute("SELECT COUNT(*) FROM Posts;") + cursor.execute(""" + SELECT COUNT(*) + FROM Posts + WHERE PostTypeId IN (1,2) + AND LENGTH(COALESCE(Posts.Title || '', '') || Posts.Body) > 30; + """) result = cursor.fetchone() if result and result[0] is not None: total = int(result[0]) @@ -160,7 +171,9 @@ def process_batch(conn, args): COALESCE(Posts.Title || ' ', '') || Posts.Body) as embedding FROM Posts LEFT JOIN Posts_embeddings ON Posts.rowid = Posts_embeddings.rowid - WHERE Posts_embeddings.rowid IS NULL + WHERE Posts.PostTypeId IN (1,2) + AND LENGTH(COALESCE(Posts.Title || '', '') || Posts.Body) > 30 + AND Posts_embeddings.rowid IS NULL LIMIT {args.batch_size}; """ @@ -287,4 +300,4 @@ def main(): conn.close() if __name__ == "__main__": - main() \ No newline at end of file + main() From 4aba7137b4cde887f5355d3382ec415b358371ca Mon Sep 17 00:00:00 2001 From: Rene Cannao Date: Wed, 24 Dec 2025 05:57:29 +0000 Subject: [PATCH 5/6] Add --local-ollama option for local Ollama server support New option: --local-ollama - Uses Ollama format with localhost:11434 API endpoint - Model: nomic-embed-text-v1.5 (without hf: prefix) - No API_KEY environment variable required - Overrides api-format, api-url, and api-model flags Changes: 1. Add --local-ollama boolean flag to parse_args() 2. Modify check_env() to skip API_KEY check when local-ollama is set 3. Update configure_client() to generate Ollama-specific SQL without 'key' parameter 4. Update main() to display correct configuration based on mode 5. Update documentation with local Ollama usage Behavior: - Without --local-ollama: Requires API_KEY, uses remote API with configurable format/url/model - With --local-ollama: No API_KEY needed, uses fixed local Ollama configuration --- scripts/process_posts_embeddings.py | 61 +++++++++++++++++++++-------- 1 file changed, 44 insertions(+), 17 deletions(-) diff --git a/scripts/process_posts_embeddings.py b/scripts/process_posts_embeddings.py index c736588b5f..3bd9513f1f 100755 --- a/scripts/process_posts_embeddings.py +++ b/scripts/process_posts_embeddings.py @@ -14,7 +14,8 @@ 2. Posts_embeddings virtual table must exist: CREATE VIRTUAL TABLE Posts_embeddings USING vec0(embedding float[768]); -Environment variable API_KEY must be set for API authentication. +For remote API: Environment variable API_KEY must be set for API authentication. +For local Ollama: Use --local-ollama flag (no API_KEY required). If Posts_embeddings table doesn't exist, the script will fail. """ @@ -52,11 +53,16 @@ def parse_args(): help='Batch size for embedding generation (default: 10)') parser.add_argument('--retry-delay', type=int, default=5, help='Delay in seconds on error (default: 5)') + parser.add_argument('--local-ollama', action='store_true', + help='Use local Ollama server instead of remote API (no API_KEY required)') return parser.parse_args() -def check_env(): +def check_env(args): """Check required environment variables.""" + if args.local_ollama: + # Local Ollama doesn't require API key + return None api_key = os.getenv('API_KEY') if not api_key: print("ERROR: API_KEY environment variable must be set") @@ -85,18 +91,33 @@ def configure_client(conn, args, api_key): """Configure rembed API client.""" cursor = conn.cursor() - insert_sql = f""" - INSERT INTO temp.rembed_clients(name, options) VALUES - ( - '{args.client_name}', - rembed_client_options( - 'format', '{args.api_format}', - 'url', '{args.api_url}', - 'key', '{api_key}', - 'model', '{args.api_model}' - ) - ); - """ + if args.local_ollama: + # Local Ollama configuration + insert_sql = f""" + INSERT INTO temp.rembed_clients(name, options) VALUES + ( + '{args.client_name}', + rembed_client_options( + 'format', 'ollama', + 'url', 'http://localhost:11434/api/embeddings', + 'model', 'nomic-embed-text-v1.5' + ) + ); + """ + else: + # Remote API configuration + insert_sql = f""" + INSERT INTO temp.rembed_clients(name, options) VALUES + ( + '{args.client_name}', + rembed_client_options( + 'format', '{args.api_format}', + 'url', '{args.api_url}', + 'key', '{api_key}', + 'model', '{args.api_model}' + ) + ); + """ try: cursor.execute(insert_sql) @@ -190,7 +211,7 @@ def process_batch(conn, args): def main(): """Main processing loop.""" args = parse_args() - api_key = check_env() + api_key = check_env(args) print("=" * 60) print("Posts Table Embeddings Processor") @@ -199,8 +220,14 @@ def main(): print(f"Database: {args.database}") print(f"API Client: {args.client_name}") print(f"Batch Size: {args.batch_size}") - print(f"API URL: {args.api_url}") - print(f"Model: {args.api_model}") + if args.local_ollama: + print(f"Mode: Local Ollama") + print(f"URL: http://localhost:11434/api/embeddings") + print(f"Model: nomic-embed-text-v1.5") + else: + print(f"Mode: Remote API") + print(f"API URL: {args.api_url}") + print(f"Model: {args.api_model}") print("=" * 60) # Connect to database From 221831afc12a6aa7d4c5e4be1f8acb357149a9fc Mon Sep 17 00:00:00 2001 From: Rene Cannao Date: Wed, 24 Dec 2025 06:00:39 +0000 Subject: [PATCH 6/6] Add usage examples to script documentation and help output Changes: 1. Add comprehensive usage examples to script docstring with: - Remote API example (requires API_KEY environment variable) - Local Ollama example (uses --local-ollama flag) - Both examples show all common command line options 2. Add epilog to argparse help with concise examples: - Shows minimal command line for both modes - Points to docstring for full examples - Uses RawDescriptionHelpFormatter for proper formatting Users now have multiple ways to access usage information: - Read script header with `head -50 scripts/process_posts_embeddings.py` - Run `python3 scripts/process_posts_embeddings.py --help` - Both show appropriate examples for remote API and local Ollama modes --- scripts/process_posts_embeddings.py | 40 ++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/scripts/process_posts_embeddings.py b/scripts/process_posts_embeddings.py index 3bd9513f1f..cddfb495af 100755 --- a/scripts/process_posts_embeddings.py +++ b/scripts/process_posts_embeddings.py @@ -17,6 +17,30 @@ For remote API: Environment variable API_KEY must be set for API authentication. For local Ollama: Use --local-ollama flag (no API_KEY required). If Posts_embeddings table doesn't exist, the script will fail. + +Usage Examples: + +1. Remote API (requires API_KEY environment variable): + export API_KEY='your-api-key' + python3 process_posts_embeddings.py \ + --host 127.0.0.1 \ + --port 6030 \ + --user root \ + --password root \ + --database main \ + --client-name posts-embed-client \ + --batch-size 10 + +2. Local Ollama server (no API_KEY required): + python3 process_posts_embeddings.py \ + --local-ollama \ + --host 127.0.0.1 \ + --port 6030 \ + --user root \ + --password root \ + --database main \ + --client-name posts-embed-client \ + --batch-size 10 """ import os @@ -28,8 +52,22 @@ def parse_args(): """Parse command line arguments.""" + epilog = """ +Usage Examples: + +1. Remote API (requires API_KEY environment variable): + export API_KEY='your-api-key' + python3 process_posts_embeddings.py --host 127.0.0.1 --port 6030 + +2. Local Ollama server (no API_KEY required): + python3 process_posts_embeddings.py --local-ollama --host 127.0.0.1 --port 6030 + +See script docstring for full examples with all options. +""" parser = argparse.ArgumentParser( - description='Process Posts table embeddings in ProxySQL SQLite3 server' + description='Process Posts table embeddings in ProxySQL SQLite3 server', + epilog=epilog, + formatter_class=argparse.RawDescriptionHelpFormatter ) parser.add_argument('--host', default='127.0.0.1', help='ProxySQL SQLite3 server host (default: 127.0.0.1)')