Add selective sync support for the FDW queries to find job artifacts

parent 741d21c8
......@@ -45,10 +45,10 @@ module Geo
# rubocop: disable CodeReuse/ActiveRecord
def find_unsynced(batch_size:, except_artifact_ids: [])
relation =
if use_legacy_queries?
legacy_find_unsynced(except_artifact_ids: except_artifact_ids)
if use_legacy_queries_for_selective_sync?
legacy_finder.job_artifacts_unsynced(except_artifact_ids: except_artifact_ids)
else
fdw_find_unsynced(except_artifact_ids: except_artifact_ids)
job_artifacts_unsynced(except_artifact_ids: except_artifact_ids)
end
relation.limit(batch_size)
......@@ -58,10 +58,10 @@ module Geo
# rubocop: disable CodeReuse/ActiveRecord
def find_migrated_local(batch_size:, except_artifact_ids: [])
relation =
if use_legacy_queries?
legacy_find_migrated_local(except_artifact_ids: except_artifact_ids)
if use_legacy_queries_for_selective_sync?
legacy_finder.job_artifacts_migrated_local(except_artifact_ids: except_artifact_ids)
else
fdw_find_migrated_local(except_artifact_ids: except_artifact_ids)
job_artifacts_migrated_local(except_artifact_ids: except_artifact_ids)
end
relation.limit(batch_size)
......@@ -136,7 +136,7 @@ module Geo
end
end
def fdw_find_unsynced(except_artifact_ids:)
def job_artifacts_unsynced(except_artifact_ids:)
fdw_geo_node
.job_artifacts
.syncable
......@@ -144,32 +144,12 @@ module Geo
.id_not_in(except_artifact_ids)
end
def fdw_find_migrated_local(except_artifact_ids:)
def job_artifacts_migrated_local(except_artifact_ids:)
fdw_geo_node
.job_artifacts
.inner_join_job_artifact_registry
.with_files_stored_remotely
.id_not_in(except_artifact_ids)
end
def legacy_find_unsynced(except_artifact_ids:)
registry_artifact_ids = Geo::JobArtifactRegistry.pluck_artifact_key | except_artifact_ids
legacy_left_outer_join_registry_ids(
syncable,
registry_artifact_ids,
Ci::JobArtifact
)
end
def legacy_find_migrated_local(except_artifact_ids:)
registry_artifact_ids = Geo::JobArtifactRegistry.pluck_artifact_key - except_artifact_ids
legacy_inner_join_registry_ids(
current_node.job_artifacts.with_files_stored_remotely,
registry_artifact_ids,
Ci::JobArtifact
)
end
end
end
......@@ -30,6 +30,26 @@ module Geo
)
end
def job_artifacts_unsynced(except_artifact_ids: [])
registry_artifact_ids = Geo::JobArtifactRegistry.pluck_artifact_key | except_artifact_ids
legacy_left_outer_join_registry_ids(
syncable,
registry_artifact_ids,
Ci::JobArtifact
)
end
def job_artifacts_migrated_local(except_artifact_ids: [])
registry_artifact_ids = Geo::JobArtifactRegistry.pluck_artifact_key - except_artifact_ids
legacy_inner_join_registry_ids(
current_node.job_artifacts.with_files_stored_remotely,
registry_artifact_ids,
Ci::JobArtifact
)
end
def registries_for_job_artifacts
return Geo::JobArtifactRegistry.all unless selective_sync?
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment