Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE subgraphs.deployment DROP COLUMN postponed_indexes_created;
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
ALTER TABLE subgraphs.deployment
ADD COLUMN postponed_indexes_created BOOLEAN NOT NULL DEFAULT FALSE;

-- Existing synced deployments already had their postponed indexes created
-- eagerly under the old code path; mark them done so we don't try again.
UPDATE subgraphs.deployment
SET postponed_indexes_created = TRUE
WHERE synced_at IS NOT NULL;
77 changes: 46 additions & 31 deletions store/postgres/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,51 @@ impl CopyState {
fn all_tables(&self) -> impl Iterator<Item = &TableState> {
self.finished.iter().chain(self.unfinished.iter())
}

/// Create the indexes that were postponed at the start of the
/// copy/graft operation: first the ones that existed in the source
/// subgraph, then any indexes for new fields that don't exist in the
/// source.
async fn create_indexes(
&self,
conn: &mut AsyncPgConnection,
index_list: &IndexList,
) -> Result<(), StoreError> {
let creat = self.dst.index_creator(false, true);
let dst_nsp = self.dst.site.namespace.to_string();

// Create the indexes that existed in the source subgraph
for table in self.all_tables() {
let idxs = index_list
.indexes_for_table(&table.dst)
.filter(|idx| idx.to_postpone())
.map(|idx| idx.with_nsp(dst_nsp.clone()))
.collect::<Result<Vec<_>, _>>()?;

creat.execute_many(conn, &idxs).await?;
}

// Create indexes for new fields that don't exist in the source
for table in self.all_tables() {
let src_columns: HashSet<&str> =
table.src.columns.iter().map(|c| c.name.as_str()).collect();
let new_idxs: Vec<_> = table
.dst
.indexes(&self.dst.input_schema)
.map_err(|_| internal_error!("failed to generate indexes for copy"))?
.into_iter()
.filter(|idx| idx.to_postpone() && idx.references_column_not_in(&src_columns))
.collect();
creat.execute_many(conn, &new_idxs).await?;
}

conn.transaction(async |conn| {
deployment::set_postponed_indexes_created(conn, &self.dst.site).await
})
.await?;

Ok(())
}
}

pub(crate) async fn source(
Expand Down Expand Up @@ -1237,37 +1282,7 @@ impl Connection {
}
debug_assert!(self.conn.is_some());

// Create indexes for all the attributes that were postponed at the start of
// the copy/graft operations.
// First recreate the indexes that existed in the original subgraph.
let creat = self.dst.index_creator(false, true);
for table in state.all_tables() {
let dst_nsp = self.dst.site.namespace.to_string();
let idxs = index_list
.indexes_for_table(&table.dst)
.filter(|idx| idx.to_postpone())
.map(|idx| idx.with_nsp(dst_nsp.clone()))
.collect::<Result<Vec<_>, _>>()?;

let conn = self.get_conn()?;
creat.execute_many(conn, &idxs).await?;
}

// Second create the indexes for the new fields that don't exist in
// the source.
for table in state.all_tables() {
let src_columns: HashSet<&str> =
table.src.columns.iter().map(|c| c.name.as_str()).collect();
let new_idxs: Vec<_> = table
.dst
.indexes(&self.dst.input_schema)
.map_err(|_| internal_error!("failed to generate indexes for copy"))?
.into_iter()
.filter(|idx| idx.to_postpone() && idx.references_column_not_in(&src_columns))
.collect();
let conn = self.get_conn()?;
creat.execute_many(conn, &new_idxs).await?;
}
state.create_indexes(self.get_conn()?, &index_list).await?;

self.copy_private_data_sources(&state).await?;

Expand Down
38 changes: 38 additions & 0 deletions store/postgres/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ table! {

synced_at -> Nullable<Timestamptz>,
synced_at_block_number -> Nullable<Int4>,

/// `true` once the postponed (deferred) attribute indexes have been
/// created. Persisted so that we create them exactly once even
/// across restarts, and so that we don't recreate indexes that an
/// external process removed because they were unused.
postponed_indexes_created -> Bool,
}
}

Expand Down Expand Up @@ -832,6 +838,38 @@ pub async fn set_synced(
Ok(())
}

/// Returns whether the postponed (deferred) attribute indexes have already
/// been created for this deployment.
pub async fn postponed_indexes_created(
conn: &mut AsyncPgConnection,
site: &Site,
) -> Result<bool, StoreError> {
use deployment as d;

let created = d::table
.filter(d::id.eq(site.id))
.select(d::postponed_indexes_created)
.first::<bool>(conn)
.await?;
Ok(created)
}

/// Mark the postponed (deferred) attribute indexes as created for this
/// deployment. Once set, we never recreate these indexes — even if an
/// external process removes them.
pub async fn set_postponed_indexes_created(
conn: &mut AsyncPgConnection,
site: &Site,
) -> Result<(), StoreError> {
use deployment as d;

update(d::table.filter(d::id.eq(site.id)))
.set(d::postponed_indexes_created.eq(true))
.execute(conn)
.await?;
Ok(())
}

/// Returns `true` if the deployment (as identified by `site.id`)
pub async fn exists(conn: &mut AsyncPgConnection, site: &Site) -> Result<bool, StoreError> {
use deployment as d;
Expand Down
22 changes: 13 additions & 9 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1669,11 +1669,6 @@ impl DeploymentStore {
.await?;
}

// Create any indexes whose creation was postponed when the
// deployment was first created. Using `IF NOT EXISTS` and
// `CONCURRENTLY` makes this safe to call on every restart.
self.create_postponed_indexes(site.cheap_clone()).await?;

// Make sure the block pointer is set. This is important for newly
// deployed subgraphs so that we respect the 'startBlock' setting
// the first time the subgraph is started
Expand All @@ -1686,13 +1681,20 @@ impl DeploymentStore {
}

/// Create all indexes whose creation was postponed when the
/// deployment was first created. Using `IF NOT EXISTS` and
/// `CONCURRENTLY` makes this safe to call even when some or all
/// indexes already exist.
/// deployment was first created. The decision is persisted in the
/// `postponed_indexes_created` flag on `subgraphs.deployment` so that
/// this is a one-shot operation: indexes are created exactly once,
/// and we never recreate them — even if an external process removes
/// indexes that it considers unused.
pub(crate) async fn create_postponed_indexes(&self, site: Arc<Site>) -> Result<(), StoreError> {
let layout = self.find_layout(site).await?;
let layout = self.find_layout(site.cheap_clone()).await?;
let creat = layout.index_creator(true, true);
let mut conn = self.pool.get_permitted().await?;

if deployment::postponed_indexes_created(&mut conn, &site).await? {
return Ok(());
}

for table in layout.tables.values() {
let indexes = table.indexes(&layout.input_schema).map_err(|e| {
StoreError::ConstraintViolation(format!("failed to generate indexes: {}", e))
Expand All @@ -1703,6 +1705,8 @@ impl DeploymentStore {
}
}
}

deployment::set_postponed_indexes_created(&mut conn, &site).await?;
Ok(())
}

Expand Down
7 changes: 1 addition & 6 deletions store/postgres/src/writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1604,7 +1604,6 @@ pub struct WritableStore {

// Cached to avoid querying the database.
is_deployment_synced: AtomicBool,
postponed_indexes_created: AtomicBool,
}

impl WritableStore {
Expand Down Expand Up @@ -1646,7 +1645,6 @@ impl WritableStore {
block_cursor,
writer,
is_deployment_synced: AtomicBool::new(is_deployment_synced),
postponed_indexes_created: AtomicBool::new(false),
})
}

Expand Down Expand Up @@ -1892,10 +1890,7 @@ impl WritableStoreTrait for WritableStore {
}

async fn create_postponed_indexes(&self) -> Result<(), StoreError> {
if !self.postponed_indexes_created.swap(true, Ordering::SeqCst) {
self.store.create_postponed_indexes().await?;
}
Ok(())
self.store.create_postponed_indexes().await
}

async fn flush(&self) -> Result<(), StoreError> {
Expand Down
Loading