diff --git a/doc/posts-embeddings-setup.md b/doc/posts-embeddings-setup.md new file mode 100644 index 0000000000..23d2d7962e --- /dev/null +++ b/doc/posts-embeddings-setup.md @@ -0,0 +1,295 @@ +# Posts Table Embeddings Setup Guide + +This guide explains how to set up and populate virtual tables for storing and searching embeddings of the Posts table content using sqlite-rembed and sqlite-vec extensions in ProxySQL. + +## Prerequisites + +1. **ProxySQL** running with SQLite3 backend enabled (`--sqlite3-server` flag) +2. **Posts table** copied from MySQL to SQLite3 server (248,905 rows) + - Use `scripts/copy_stackexchange_Posts_mysql_to_sqlite3.py` if not already copied +3. **Valid API credentials** for embedding generation +4. **Network access** to embedding API endpoint + +## Setup Steps + +### Step 1: Create Virtual Vector Table + +Create a virtual table for storing 768-dimensional embeddings (matching nomic-embed-text-v1.5 model output): + +```sql +-- Create virtual vector table for Posts embeddings +CREATE VIRTUAL TABLE Posts_embeddings USING vec0( + embedding float[768] +); +``` + +### Step 2: Configure API Client + +Configure an embedding API client using the `temp.rembed_clients` virtual table: + +```sql +-- Configure embedding API client +-- Replace YOUR_API_KEY with actual API key +INSERT INTO temp.rembed_clients(name, options) VALUES + ('posts-embed-client', + rembed_client_options( + 'format', 'openai', + 'url', 'https://api.synthetic.new/openai/v1/embeddings', + 'key', 'YOUR_API_KEY', + 'model', 'hf:nomic-ai/nomic-embed-text-v1.5' + ) + ); +``` + +### Step 3: Generate and Insert Embeddings + +#### For Testing (First 100 rows) + +```sql +-- Generate embeddings for first 100 Posts +INSERT OR REPLACE INTO Posts_embeddings(rowid, embedding) +SELECT rowid, rembed('posts-embed-client', + COALESCE(Title || ' ', '') || Body) as embedding +FROM Posts +LIMIT 100; +``` + +#### For Full Table (Batch Processing) + +Use this optimized batch query that processes unembedded rows without requiring rowid tracking: + +```sql +-- Batch process unembedded rows (processes ~1000 rows at a time) +INSERT OR REPLACE INTO Posts_embeddings(rowid, embedding) +SELECT Posts.rowid, rembed('posts-embed-client', + COALESCE(Posts.Title || ' ', '') || Posts.Body) as embedding +FROM Posts +LEFT JOIN Posts_embeddings ON Posts.rowid = Posts_embeddings.rowid +WHERE Posts_embeddings.rowid IS NULL +LIMIT 1000; +``` + +**Key features of this batch query:** +- Uses `LEFT JOIN` to find Posts without existing embeddings +- `WHERE Posts_embeddings.rowid IS NULL` filters for unprocessed rows +- `LIMIT 1000` controls batch size +- Can be run repeatedly until all rows are processed +- No need to track which rowids have been processed + +### Step 4: Verify Embeddings + +```sql +-- Check total embeddings count +SELECT COUNT(*) as total_embeddings FROM Posts_embeddings; + +-- Check embedding size (should be 3072 bytes: 768 dimensions × 4 bytes) +SELECT rowid, length(embedding) as embedding_size_bytes +FROM Posts_embeddings LIMIT 3; + +-- Check percentage of Posts with embeddings +SELECT + (SELECT COUNT(*) FROM Posts_embeddings) as with_embeddings, + (SELECT COUNT(*) FROM Posts) as total_posts, + ROUND( + (SELECT COUNT(*) FROM Posts_embeddings) * 100.0 / + (SELECT COUNT(*) FROM Posts), 2 + ) as percentage_complete; +``` + +## Batch Processing Strategy for 248,905 Rows + +### Recommended Approach + +1. **Run the batch query repeatedly** until all rows have embeddings +2. **Add delays between batches** to avoid API rate limiting +3. **Monitor progress** using the verification queries above + +### Example Shell Script for Batch Processing + +```bash +#!/bin/bash +# process_posts_embeddings.sh + +PROXYSQL_HOST="127.0.0.1" +PROXYSQL_PORT="6030" +MYSQL_USER="root" +MYSQL_PASS="root" +BATCH_SIZE=1000 +DELAY_SECONDS=5 + +echo "Starting Posts embeddings generation..." + +while true; do + # Execute batch query + mysql -h "$PROXYSQL_HOST" -P "$PROXYSQL_PORT" -u "$MYSQL_USER" -p"$MYSQL_PASS" << EOF + INSERT OR REPLACE INTO Posts_embeddings(rowid, embedding) + SELECT Posts.rowid, rembed('posts-embed-client', + COALESCE(Posts.Title || ' ', '') || Posts.Body) as embedding + FROM Posts + LEFT JOIN Posts_embeddings ON Posts.rowid = Posts_embeddings.rowid + WHERE Posts_embeddings.rowid IS NULL + LIMIT $BATCH_SIZE; +EOF + + # Check if any rows were processed + PROCESSED=$(mysql -h "$PROXYSQL_HOST" -P "$PROXYSQL_PORT" -u "$MYSQL_USER" -p"$MYSQL_PASS" -s -N << EOF + SELECT COUNT(*) FROM Posts_embeddings; +EOF) + + TOTAL=$(mysql -h "$PROXYSQL_HOST" -P "$PROXYSQL_PORT" -u "$MYSQL_USER" -p"$MYSQL_PASS" -s -N << EOF + SELECT COUNT(*) FROM Posts; +EOF) + + PERCENTAGE=$(echo "scale=2; $PROCESSED * 100 / $TOTAL" | bc) + echo "Processed: $PROCESSED/$TOTAL rows ($PERCENTAGE%)" + + # Break if all rows processed + if [ "$PROCESSED" -eq "$TOTAL" ]; then + echo "All rows processed!" + break + fi + + # Wait before next batch + echo "Waiting $DELAY_SECONDS seconds before next batch..." + sleep $DELAY_SECONDS +done +``` + +## Similarity Search Examples + +Once embeddings are generated, you can perform semantic search: + +### Example 1: Find Similar Posts + +```sql +-- Find Posts similar to a query about databases +SELECT p.SiteId, p.Id as PostId, p.Title, e.distance, + substr(p.Body, 1, 100) as body_preview +FROM ( + SELECT rowid, distance + FROM Posts_embeddings + WHERE embedding MATCH rembed('posts-embed-client', + 'database systems and SQL queries') + LIMIT 5 +) e +JOIN Posts p ON e.rowid = p.rowid +ORDER BY e.distance; +``` + +### Example 2: Find Posts Similar to Specific Post + +```sql +-- Find Posts similar to Post with ID 1 +SELECT p2.SiteId, p2.Id as PostId, p2.Title, e.distance, + substr(p2.Body, 1, 100) as body_preview +FROM ( + SELECT rowid, distance + FROM Posts_embeddings + WHERE embedding MATCH ( + SELECT embedding + FROM Posts_embeddings + WHERE rowid = 1 -- Change to target Post rowid + ) + AND rowid != 1 + LIMIT 5 +) e +JOIN Posts p2 ON e.rowid = p2.rowid +ORDER BY e.distance; +``` + +## Performance Considerations + +1. **API Rate Limiting**: The `rembed()` function makes HTTP requests to the API + - Batch size of 1000 with 5-second delays is conservative + - Adjust based on API rate limits + - Monitor API usage and costs + +2. **Embedding Storage**: + - Each embedding: 768 dimensions × 4 bytes = 3,072 bytes + - Full table (248,905 rows): ~765 MB + - Ensure sufficient disk space + +3. **Search Performance**: + - `vec0` virtual tables use approximate nearest neighbor search + - Performance scales with number of vectors and dimensions + - Use `LIMIT` clauses to control result size + +## Troubleshooting + +### Common Issues + +1. **API Connection Errors** + - Verify API key is valid and has quota + - Check network connectivity to API endpoint + - Confirm API endpoint URL is correct + +2. **Embedding Generation Failures** + - Check `temp.rembed_clients` configuration + - Verify client name matches in `rembed()` calls + - Test with simple text first: `SELECT rembed('posts-embed-client', 'test');` + +3. **Batch Processing Stalls** + - Check if API rate limits are being hit + - Increase delay between batches + - Reduce batch size + +4. **Memory Issues** + - Large batches may consume significant memory + - Reduce batch size if encountering memory errors + - Monitor ProxySQL memory usage + +### Verification Queries + +```sql +-- Check API client configuration +SELECT name, json_extract(options, '$.format') as format, + json_extract(options, '$.model') as model +FROM temp.rembed_clients; + +-- Test embedding generation +SELECT length(rembed('posts-embed-client', 'test text')) as test_embedding_size; + +-- Check for embedding generation errors +SELECT rowid FROM Posts_embeddings WHERE length(embedding) != 3072; +``` + +## Maintenance + +### Adding New Posts + +When new Posts are added to the table: + +```sql +-- Generate embeddings for new Posts +INSERT OR REPLACE INTO Posts_embeddings(rowid, embedding) +SELECT Posts.rowid, rembed('posts-embed-client', + COALESCE(Posts.Title || ' ', '') || Posts.Body) as embedding +FROM Posts +LEFT JOIN Posts_embeddings ON Posts.rowid = Posts_embeddings.rowid +WHERE Posts_embeddings.rowid IS NULL; +``` + +### Recreating Virtual Table + +If you need to recreate the virtual table: + +```sql +-- Drop existing table +DROP TABLE IF EXISTS Posts_embeddings; + +-- Recreate with same schema +CREATE VIRTUAL TABLE Posts_embeddings USING vec0( + embedding float[768] +); +``` + +## Related Resources + +1. [sqlite-rembed Integration Documentation](./sqlite-rembed-integration.md) +2. [SQLite3 Server Documentation](./SQLite3-Server.md) +3. [Vector Search Testing](../doc/vector-search-test/README.md) +4. [Copy Script](../scripts/copy_stackexchange_Posts_mysql_to_sqlite3.py) + +--- + +*Last Updated: $(date)* \ No newline at end of file diff --git a/scripts/copy_stackexchange_Posts_mysql_to_sqlite3.py b/scripts/copy_stackexchange_Posts_mysql_to_sqlite3.py new file mode 100755 index 0000000000..72e9341e6f --- /dev/null +++ b/scripts/copy_stackexchange_Posts_mysql_to_sqlite3.py @@ -0,0 +1,183 @@ +#!/usr/bin/env python3 +""" +Copy Posts table from MySQL to ProxySQL SQLite3 server. +Uses Python MySQL connectors for direct database access. +""" + +import mysql.connector +import sys +import time + +# Configuration +SOURCE_CONFIG = { + "host": "127.0.0.1", + "port": 3306, + "user": "stackexchange", + "password": "my-password", + "database": "stackexchange", + "use_pure": True, + "ssl_disabled": True +} + +DEST_CONFIG = { + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "root", + "database": "main", + "use_pure": True, + "ssl_disabled": True +} + +TABLE_NAME = "Posts" +LIMIT = 0 # 0 for all rows, otherwise limit for testing +BATCH_SIZE = 5000 # Larger batch for full copy +CLEAR_TABLE_FIRST = True # Delete existing data before copying + +COLUMNS = [ + "SiteId", "Id", "PostTypeId", "AcceptedAnswerId", "ParentId", + "CreationDate", "DeletionDate", "Score", "ViewCount", "Body", + "OwnerUserId", "OwnerDisplayName", "LastEditorUserId", "LastEditorDisplayName", + "LastEditDate", "LastActivityDate", "Title", "Tags", "AnswerCount", + "CommentCount", "FavoriteCount", "ClosedDate", "CommunityOwnedDate", "ContentLicense" +] + +def escape_sql_value(value): + """Escape a value for SQL insertion.""" + if value is None: + return "NULL" + # Convert to string + s = str(value) + # Escape single quotes by doubling + escaped = s.replace("'", "''") + return f"'{escaped}'" + +def generate_insert(row): + """Generate INSERT statement for a single row.""" + values_str = ", ".join(escape_sql_value(v) for v in row) + columns_str = ", ".join(COLUMNS) + return f"INSERT INTO {TABLE_NAME} ({columns_str}) VALUES ({values_str})" + +def main(): + print(f"Copying {TABLE_NAME} from MySQL to SQLite3 server...") + print(f"Source: {SOURCE_CONFIG['host']}:{SOURCE_CONFIG['port']}") + print(f"Destination: {DEST_CONFIG['host']}:{DEST_CONFIG['port']}") + if LIMIT > 0: + print(f"Limit: {LIMIT} rows") + else: + print(f"Copying all rows") + + # Connect to source (MySQL) + try: + source_conn = mysql.connector.connect(**SOURCE_CONFIG) + source_cursor = source_conn.cursor() + print("✓ Connected to MySQL source") + except Exception as e: + print(f"✗ Failed to connect to source MySQL: {e}") + sys.exit(1) + + # Connect to destination (ProxySQL SQLite3 server) + try: + dest_conn = mysql.connector.connect(**DEST_CONFIG) + dest_cursor = dest_conn.cursor() + print("✓ Connected to SQLite3 server destination") + except Exception as e: + print(f"✗ Failed to connect to destination SQLite3 server: {e}") + source_conn.close() + sys.exit(1) + + try: + # Clear destination table if requested + if CLEAR_TABLE_FIRST: + print("Clearing destination table...") + dest_cursor.execute(f"DELETE FROM {TABLE_NAME}") + dest_conn.commit() + print("✓ Destination table cleared") + + # Build query with optional LIMIT + query = f"SELECT * FROM {TABLE_NAME}" + if LIMIT > 0: + query += f" LIMIT {LIMIT}" + + print(f"Executing query: {query}") + source_cursor.execute(query) + + rows = 0 + errors = 0 + start = time.time() + last_report = start + + # Fetch and insert rows + print("Starting copy...") + while True: + batch = source_cursor.fetchmany(BATCH_SIZE) + if not batch: + break + + for row in batch: + try: + insert_sql = generate_insert(row) + dest_cursor.execute(insert_sql) + rows += 1 + except Exception as e: + errors += 1 + if errors <= 3: + print(f"Error inserting row {rows+1}: {e}") + if errors == 1: + print(f" Sample INSERT (first 300 chars): {insert_sql[:300]}...") + + # Commit batch + dest_conn.commit() + + # Progress reporting every 1000 rows or 5 seconds + now = time.time() + if rows % 1000 == 0 or (now - last_report) >= 5: + elapsed = now - start + rate = rows / elapsed if elapsed > 0 else 0 + print(f" Processed {rows} rows ({rate:.1f} rows/sec)") + last_report = now + + # Final commit + dest_conn.commit() + + elapsed = time.time() - start + print(f"\n✓ Copy completed:") + print(f" Rows copied: {rows}") + print(f" Errors: {errors}") + print(f" Time: {elapsed:.1f}s") + if elapsed > 0: + print(f" Rate: {rows/elapsed:.1f} rows/sec") + + # Verify counts if no errors + if errors == 0: + # Get source count + if LIMIT > 0: + expected = min(LIMIT, rows) + else: + source_cursor.execute(f"SELECT COUNT(*) FROM {TABLE_NAME}") + expected = source_cursor.fetchone()[0] + + dest_cursor.execute(f"SELECT COUNT(*) FROM {TABLE_NAME}") + actual = dest_cursor.fetchone()[0] + + print(f"\n✓ Verification:") + print(f" Expected rows: {expected}") + print(f" Actual rows: {actual}") + if expected == actual: + print(f" ✓ Counts match!") + else: + print(f" ✗ Count mismatch!") + + except Exception as e: + print(f"\n✗ Error during copy: {e}") + sys.exit(1) + finally: + # Cleanup + source_cursor.close() + source_conn.close() + dest_cursor.close() + dest_conn.close() + print("\nConnections closed.") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/scripts/process_posts_embeddings.py b/scripts/process_posts_embeddings.py new file mode 100755 index 0000000000..cddfb495af --- /dev/null +++ b/scripts/process_posts_embeddings.py @@ -0,0 +1,368 @@ +#!/usr/bin/env python3 +""" +Process Posts table embeddings using sqlite-rembed in ProxySQL SQLite3 server. + +Connects to SQLite3 server via MySQL connector, configures API client, +and processes unembedded Posts rows in batches of 10. + +Filters applied: +- Only PostTypeId IN (1,2) (Questions and Answers) +- Minimum text length > 30 characters (Title + Body) + +Prerequisites: +1. Posts table must exist (copied from MySQL) +2. Posts_embeddings virtual table must exist: + CREATE VIRTUAL TABLE Posts_embeddings USING vec0(embedding float[768]); + +For remote API: Environment variable API_KEY must be set for API authentication. +For local Ollama: Use --local-ollama flag (no API_KEY required). +If Posts_embeddings table doesn't exist, the script will fail. + +Usage Examples: + +1. Remote API (requires API_KEY environment variable): + export API_KEY='your-api-key' + python3 process_posts_embeddings.py \ + --host 127.0.0.1 \ + --port 6030 \ + --user root \ + --password root \ + --database main \ + --client-name posts-embed-client \ + --batch-size 10 + +2. Local Ollama server (no API_KEY required): + python3 process_posts_embeddings.py \ + --local-ollama \ + --host 127.0.0.1 \ + --port 6030 \ + --user root \ + --password root \ + --database main \ + --client-name posts-embed-client \ + --batch-size 10 +""" + +import os +import sys +import time +import argparse +import mysql.connector +from mysql.connector import Error + +def parse_args(): + """Parse command line arguments.""" + epilog = """ +Usage Examples: + +1. Remote API (requires API_KEY environment variable): + export API_KEY='your-api-key' + python3 process_posts_embeddings.py --host 127.0.0.1 --port 6030 + +2. Local Ollama server (no API_KEY required): + python3 process_posts_embeddings.py --local-ollama --host 127.0.0.1 --port 6030 + +See script docstring for full examples with all options. +""" + parser = argparse.ArgumentParser( + description='Process Posts table embeddings in ProxySQL SQLite3 server', + epilog=epilog, + formatter_class=argparse.RawDescriptionHelpFormatter + ) + parser.add_argument('--host', default='127.0.0.1', + help='ProxySQL SQLite3 server host (default: 127.0.0.1)') + parser.add_argument('--port', type=int, default=6030, + help='ProxySQL SQLite3 server port (default: 6030)') + parser.add_argument('--user', default='root', + help='Database user (default: root)') + parser.add_argument('--password', default='root', + help='Database password (default: root)') + parser.add_argument('--database', default='main', + help='Database name (default: main)') + parser.add_argument('--client-name', default='posts-embed-client', + help='rembed client name (default: posts-embed-client)') + parser.add_argument('--api-format', default='openai', + help='API format (default: openai)') + parser.add_argument('--api-url', default='https://api.synthetic.new/openai/v1/embeddings', + help='API endpoint URL') + parser.add_argument('--api-model', default='hf:nomic-ai/nomic-embed-text-v1.5', + help='Embedding model') + parser.add_argument('--batch-size', type=int, default=10, + help='Batch size for embedding generation (default: 10)') + parser.add_argument('--retry-delay', type=int, default=5, + help='Delay in seconds on error (default: 5)') + parser.add_argument('--local-ollama', action='store_true', + help='Use local Ollama server instead of remote API (no API_KEY required)') + + return parser.parse_args() + +def check_env(args): + """Check required environment variables.""" + if args.local_ollama: + # Local Ollama doesn't require API key + return None + api_key = os.getenv('API_KEY') + if not api_key: + print("ERROR: API_KEY environment variable must be set") + print("Usage: export API_KEY='your-api-key'") + sys.exit(1) + return api_key + +def connect_db(args): + """Connect to SQLite3 server using MySQL connector.""" + try: + conn = mysql.connector.connect( + host=args.host, + port=args.port, + user=args.user, + password=args.password, + database=args.database, + use_pure=True, + ssl_disabled=True + ) + return conn + except Error as e: + print(f"ERROR: Failed to connect to database: {e}") + sys.exit(1) + +def configure_client(conn, args, api_key): + """Configure rembed API client.""" + cursor = conn.cursor() + + if args.local_ollama: + # Local Ollama configuration + insert_sql = f""" + INSERT INTO temp.rembed_clients(name, options) VALUES + ( + '{args.client_name}', + rembed_client_options( + 'format', 'ollama', + 'url', 'http://localhost:11434/api/embeddings', + 'model', 'nomic-embed-text-v1.5' + ) + ); + """ + else: + # Remote API configuration + insert_sql = f""" + INSERT INTO temp.rembed_clients(name, options) VALUES + ( + '{args.client_name}', + rembed_client_options( + 'format', '{args.api_format}', + 'url', '{args.api_url}', + 'key', '{api_key}', + 'model', '{args.api_model}' + ) + ); + """ + + try: + cursor.execute(insert_sql) + conn.commit() + print(f"✓ Configured API client '{args.client_name}'") + except Error as e: + print(f"ERROR: Failed to configure API client: {e}") + print(f"SQL: {insert_sql[:200]}...") + cursor.close() + sys.exit(1) + + cursor.close() + + +def get_remaining_count(conn): + """Get count of Posts without embeddings.""" + cursor = conn.cursor() + + count_sql = """ + SELECT COUNT(*) + FROM Posts + LEFT JOIN Posts_embeddings ON Posts.rowid = Posts_embeddings.rowid + WHERE Posts.PostTypeId IN (1,2) + AND LENGTH(COALESCE(Posts.Title || '', '') || Posts.Body) > 30 + AND Posts_embeddings.rowid IS NULL; + """ + + try: + cursor.execute(count_sql) + result = cursor.fetchone() + if result and result[0] is not None: + remaining = int(result[0]) + else: + remaining = 0 + cursor.close() + return remaining + except Error as e: + print(f"ERROR: Failed to count remaining rows: {e}") + cursor.close() + raise + +def get_total_posts(conn): + """Get total number of eligible Posts (PostTypeId 1,2 with text length > 30).""" + cursor = conn.cursor() + + try: + cursor.execute(""" + SELECT COUNT(*) + FROM Posts + WHERE PostTypeId IN (1,2) + AND LENGTH(COALESCE(Posts.Title || '', '') || Posts.Body) > 30; + """) + result = cursor.fetchone() + if result and result[0] is not None: + total = int(result[0]) + else: + total = 0 + cursor.close() + return total + except Error as e: + print(f"ERROR: Failed to count total Posts: {e}") + cursor.close() + raise + +def process_batch(conn, args): + """Process a batch of unembedded Posts.""" + cursor = conn.cursor() + + insert_sql = f""" + INSERT OR REPLACE INTO Posts_embeddings(rowid, embedding) + SELECT Posts.rowid, rembed('{args.client_name}', + COALESCE(Posts.Title || ' ', '') || Posts.Body) as embedding + FROM Posts + LEFT JOIN Posts_embeddings ON Posts.rowid = Posts_embeddings.rowid + WHERE Posts.PostTypeId IN (1,2) + AND LENGTH(COALESCE(Posts.Title || '', '') || Posts.Body) > 30 + AND Posts_embeddings.rowid IS NULL + LIMIT {args.batch_size}; + """ + + try: + cursor.execute(insert_sql) + conn.commit() + processed = cursor.rowcount + cursor.close() + return processed, None + except Error as e: + cursor.close() + return 0, str(e) + +def main(): + """Main processing loop.""" + args = parse_args() + api_key = check_env(args) + + print("=" * 60) + print("Posts Table Embeddings Processor") + print("=" * 60) + print(f"Host: {args.host}:{args.port}") + print(f"Database: {args.database}") + print(f"API Client: {args.client_name}") + print(f"Batch Size: {args.batch_size}") + if args.local_ollama: + print(f"Mode: Local Ollama") + print(f"URL: http://localhost:11434/api/embeddings") + print(f"Model: nomic-embed-text-v1.5") + else: + print(f"Mode: Remote API") + print(f"API URL: {args.api_url}") + print(f"Model: {args.api_model}") + print("=" * 60) + + # Connect to database + conn = connect_db(args) + + # Configure API client + configure_client(conn, args, api_key) + + # Get initial counts + try: + total_posts = get_total_posts(conn) + remaining = get_remaining_count(conn) + processed = total_posts - remaining + + print(f"\nInitial status:") + print(f" Total Posts: {total_posts}") + print(f" Already embedded: {processed}") + print(f" Remaining: {remaining}") + print("-" * 40) + except Error as e: + print(f"ERROR: Failed to get initial counts: {e}") + conn.close() + sys.exit(1) + + if remaining == 0: + print("✓ All Posts already have embeddings. Nothing to do.") + conn.close() + sys.exit(0) + + # Main processing loop + iteration = 0 + total_processed = processed + consecutive_failures = 0 + MAX_BACKOFF_SECONDS = 300 # 5 minutes maximum backoff + + while True: + iteration += 1 + + # Get current remaining count + try: + remaining = get_remaining_count(conn) + except Error as e: + print(f"ERROR: Failed to get remaining count: {e}") + conn.close() + sys.exit(1) + + if remaining == 0: + print(f"\n✓ All {total_posts} Posts have embeddings!") + break + + # Show progress + if total_posts > 0: + progress_percent = (total_processed / total_posts) * 100 + progress_str = f" ({progress_percent:.1f}%)" + else: + progress_str = "" + print(f"\nIteration {iteration}:") + print(f" Remaining: {remaining}") + print(f" Processed: {total_processed}/{total_posts}{progress_str}") + + # Process batch + processed_count, error = process_batch(conn, args) + + if error: + consecutive_failures += 1 + backoff_delay = min(args.retry_delay * (2 ** (consecutive_failures - 1)), MAX_BACKOFF_SECONDS) + print(f" ✗ Batch failed: {error}") + print(f" Consecutive failures: {consecutive_failures}") + print(f" Waiting {backoff_delay} seconds before retry...") + time.sleep(backoff_delay) + continue + + # Reset consecutive failures on any successful operation (even if no rows processed) + consecutive_failures = 0 + + if processed_count > 0: + total_processed += processed_count + print(f" ✓ Processed {processed_count} rows") + # Continue immediately (no delay on success) + else: + print(f" ⓘ No rows processed (possibly concurrent process?)") + # Small delay if no rows processed (could be race condition) + time.sleep(1) + + # Final summary + print("\n" + "=" * 60) + print("Processing Complete!") + print(f"Total Posts: {total_posts}") + print(f"Total with embeddings: {total_processed}") + if total_posts > 0: + success_percent = (total_processed / total_posts) * 100 + print(f"Success rate: {success_percent:.1f}%") + else: + print("Success rate: N/A (no posts)") + print("=" * 60) + + conn.close() + +if __name__ == "__main__": + main()