Commit 5b4fe788 authored by Sean McGivern's avatar Sean McGivern

API endpoint to remove jobs from Sidekiq based on metadata

This API endpoint uses job metadata to remove jobs from a queue. It can
only be used by admins, and deletes as many jobs as it can in 30
seconds. If it exceeds 30 seconds, it returns a flag indicating that the
user should try again to finish processing the queue.

(Because of the way Sidekiq queues work, it can't resume where it left
off, so you just have to start from scratch each time.)

By default, it's implemented in GraphQL, but there's also a matching
REST endpoint for convenience.
parent 330b7d5e
# frozen_string_literal: true
module Mutations
module Admin
module SidekiqQueues
class DeleteJobs < BaseMutation
graphql_name 'AdminSidekiqQueuesDeleteJobs'
ADMIN_MESSAGE = 'You must be an admin to use this mutation'
Labkit::Context::KNOWN_KEYS.each do |key|
argument key,
GraphQL::STRING_TYPE,
required: false,
description: "Delete jobs matching #{key} in the context metadata"
end
argument :queue_name,
GraphQL::STRING_TYPE,
required: true,
description: 'The name of the queue to delete jobs from'
field :result,
Types::Admin::SidekiqQueues::DeleteJobsResponseType,
null: true,
description: 'Information about the status of the deletion request'
def ready?(**args)
unless current_user&.admin?
raise Gitlab::Graphql::Errors::ResourceNotAvailable, ADMIN_MESSAGE
end
super
end
def resolve(args)
{
result: Gitlab::SidekiqQueue.new(args[:queue_name]).drop_jobs!(args, timeout: 30),
errors: []
}
rescue Gitlab::SidekiqQueue::NoMetadataError
{
result: nil,
errors: ['No metadata provided']
}
rescue Gitlab::SidekiqQueue::InvalidQueueError
raise Gitlab::Graphql::Errors::ResourceNotAvailable, "Queue #{args[:queue_name]} not found"
end
end
end
end
end
# frozen_string_literal: true
module Types
module Admin
module SidekiqQueues
# We can't authorize against the value passed to this because it's
# a plain hash.
class DeleteJobsResponseType < BaseObject # rubocop:disable Graphql/AuthorizeTypes
graphql_name 'DeleteJobsResponse'
description 'The response from the AdminSidekiqQueuesDeleteJobs mutation.'
field :completed,
GraphQL::BOOLEAN_TYPE,
null: true,
description: 'Whether or not the entire queue was processed in time; if not, retrying the same request is safe'
field :deleted_jobs,
GraphQL::INT_TYPE,
null: true,
description: 'The number of matching jobs deleted'
field :queue_size,
GraphQL::INT_TYPE,
null: true,
description: 'The queue size after processing'
end
end
end
end
...@@ -6,6 +6,7 @@ module Types ...@@ -6,6 +6,7 @@ module Types
graphql_name 'Mutation' graphql_name 'Mutation'
mount_mutation Mutations::Admin::SidekiqQueues::DeleteJobs
mount_mutation Mutations::AwardEmojis::Add mount_mutation Mutations::AwardEmojis::Add
mount_mutation Mutations::AwardEmojis::Remove mount_mutation Mutations::AwardEmojis::Remove
mount_mutation Mutations::AwardEmojis::Toggle mount_mutation Mutations::AwardEmojis::Toggle
......
---
title: Add admin API endpoint to delete Sidekiq jobs matching metadata
merge_request: 25998
author:
type: added
# Admin Sidekiq queues API
> **Note:** This feature was [introduced](https://gitlab.com/gitlab-org/gitlab/-/merge_requests/25998) in GitLab 12.9
Delete jobs from a Sidekiq queue that match the given
[metadata](../development/logging.md#logging-context-metadata-through-rails-or-grape-requests).
The response has three fields:
1. `deleted_jobs` - the number of jobs deleted by the request.
1. `queue_size` - the remaining size of the queue after processing the
request.
1. `completed` - whether or not the request was able to process the
entire queue in time. If not, retrying with the same parameters may
delete further jobs (including those added after the first request
was issued).
This API endpoint is only available to admin users.
```
DELETE /admin/sidekiq/queues/:queue_name
```
| Attribute | Type | Required | Description |
| --------- | -------------- | -------- | ----------- |
| `queue_name` | string | yes | The name of the queue to delete jobs from |
| `user` | string | no | The username of the user who scheduled the jobs |
| `project` | string | no | The full path of the project where the jobs were scheduled from |
| `root_namespace` | string | no | The root namespace of the project |
| `subscription_plan` | string | no | The subscription plan of the root namespace (GitLab.com only) |
| `caller_id` | string | no | The endpoint or background job that schedule the job (for example: `ProjectsController#create`, `/api/:version/projects/:id`, `PostReceive`) |
At least one attribute, other than `queue_name`, is required.
```shell
curl --header "PRIVATE-TOKEN: <your_access_token>" https://gitlab.example.com/api/v4/admin/sidekiq/queues/authorized_projects?user=root
```
Example response:
```json
{
"completed": true,
"deleted_jobs": 7,
"queue_size": 14
}
```
...@@ -106,7 +106,8 @@ The following API resources are available in the group context: ...@@ -106,7 +106,8 @@ The following API resources are available in the group context:
The following API resources are available outside of project and group contexts (including `/users`): The following API resources are available outside of project and group contexts (including `/users`):
| Resource | Available endpoints | | Resource | Available endpoints |
|:--------------------------------------------------|:------------------------------------------------------------------------| |:---------------------------------------------------|:------------------------------------------------------------------------|
| [Admin Sidekiq queues](admin_sidekiq_queues.md) | `/admin/sidekiq/queues/:queue_name` |
| [Appearance](appearance.md) **(CORE ONLY)** | `/application/appearance` | | [Appearance](appearance.md) **(CORE ONLY)** | `/application/appearance` |
| [Applications](applications.md) | `/applications` | | [Applications](applications.md) | `/applications` |
| [Audit Events](audit_events.md) **(PREMIUM ONLY)** | `/audit_events` | | [Audit Events](audit_events.md) **(PREMIUM ONLY)** | `/audit_events` |
......
...@@ -38,6 +38,66 @@ type AddAwardEmojiPayload { ...@@ -38,6 +38,66 @@ type AddAwardEmojiPayload {
errors: [String!]! errors: [String!]!
} }
"""
Autogenerated input type of AdminSidekiqQueuesDeleteJobs
"""
input AdminSidekiqQueuesDeleteJobsInput {
"""
Delete jobs matching caller_id in the context metadata
"""
callerId: String
"""
A unique identifier for the client performing the mutation.
"""
clientMutationId: String
"""
Delete jobs matching project in the context metadata
"""
project: String
"""
The name of the queue to delete jobs from
"""
queueName: String!
"""
Delete jobs matching root_namespace in the context metadata
"""
rootNamespace: String
"""
Delete jobs matching subscription_plan in the context metadata
"""
subscriptionPlan: String
"""
Delete jobs matching user in the context metadata
"""
user: String
}
"""
Autogenerated return type of AdminSidekiqQueuesDeleteJobs
"""
type AdminSidekiqQueuesDeleteJobsPayload {
"""
A unique identifier for the client performing the mutation.
"""
clientMutationId: String
"""
Reasons why the mutation failed.
"""
errors: [String!]!
"""
Information about the status of the deletion request
"""
result: DeleteJobsResponse
}
""" """
An emoji awarded by a user. An emoji awarded by a user.
""" """
...@@ -601,6 +661,26 @@ type CreateSnippetPayload { ...@@ -601,6 +661,26 @@ type CreateSnippetPayload {
snippet: Snippet snippet: Snippet
} }
"""
The response from the AdminSidekiqQueuesDeleteJobs mutation.
"""
type DeleteJobsResponse {
"""
Whether or not the entire queue was processed in time; if not, retrying the same request is safe
"""
completed: Boolean
"""
The number of matching jobs deleted
"""
deletedJobs: Int
"""
The queue size after processing
"""
queueSize: Int
}
""" """
A single design A single design
""" """
...@@ -4772,6 +4852,7 @@ enum MoveType { ...@@ -4772,6 +4852,7 @@ enum MoveType {
type Mutation { type Mutation {
addAwardEmoji(input: AddAwardEmojiInput!): AddAwardEmojiPayload addAwardEmoji(input: AddAwardEmojiInput!): AddAwardEmojiPayload
adminSidekiqQueuesDeleteJobs(input: AdminSidekiqQueuesDeleteJobsInput!): AdminSidekiqQueuesDeleteJobsPayload
createDiffNote(input: CreateDiffNoteInput!): CreateDiffNotePayload createDiffNote(input: CreateDiffNoteInput!): CreateDiffNotePayload
createEpic(input: CreateEpicInput!): CreateEpicPayload createEpic(input: CreateEpicInput!): CreateEpicPayload
createImageDiffNote(input: CreateImageDiffNoteInput!): CreateImageDiffNotePayload createImageDiffNote(input: CreateImageDiffNoteInput!): CreateImageDiffNotePayload
......
...@@ -19106,6 +19106,33 @@ ...@@ -19106,6 +19106,33 @@
"isDeprecated": false, "isDeprecated": false,
"deprecationReason": null "deprecationReason": null
}, },
{
"name": "adminSidekiqQueuesDeleteJobs",
"description": null,
"args": [
{
"name": "input",
"description": null,
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": {
"kind": "INPUT_OBJECT",
"name": "AdminSidekiqQueuesDeleteJobsInput",
"ofType": null
}
},
"defaultValue": null
}
],
"type": {
"kind": "OBJECT",
"name": "AdminSidekiqQueuesDeleteJobsPayload",
"ofType": null
},
"isDeprecated": false,
"deprecationReason": null
},
{ {
"name": "createDiffNote", "name": "createDiffNote",
"description": null, "description": null,
...@@ -19978,6 +20005,213 @@ ...@@ -19978,6 +20005,213 @@
"enumValues": null, "enumValues": null,
"possibleTypes": null "possibleTypes": null
}, },
{
"kind": "OBJECT",
"name": "AdminSidekiqQueuesDeleteJobsPayload",
"description": "Autogenerated return type of AdminSidekiqQueuesDeleteJobs",
"fields": [
{
"name": "clientMutationId",
"description": "A unique identifier for the client performing the mutation.",
"args": [
],
"type": {
"kind": "SCALAR",
"name": "String",
"ofType": null
},
"isDeprecated": false,
"deprecationReason": null
},
{
"name": "errors",
"description": "Reasons why the mutation failed.",
"args": [
],
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": {
"kind": "LIST",
"name": null,
"ofType": {
"kind": "NON_NULL",
"name": null,
"ofType": {
"kind": "SCALAR",
"name": "String",
"ofType": null
}
}
}
},
"isDeprecated": false,
"deprecationReason": null
},
{
"name": "result",
"description": "Information about the status of the deletion request",
"args": [
],
"type": {
"kind": "OBJECT",
"name": "DeleteJobsResponse",
"ofType": null
},
"isDeprecated": false,
"deprecationReason": null
}
],
"inputFields": null,
"interfaces": [
],
"enumValues": null,
"possibleTypes": null
},
{
"kind": "OBJECT",
"name": "DeleteJobsResponse",
"description": "The response from the AdminSidekiqQueuesDeleteJobs mutation.",
"fields": [
{
"name": "completed",
"description": "Whether or not the entire queue was processed in time; if not, retrying the same request is safe",
"args": [
],
"type": {
"kind": "SCALAR",
"name": "Boolean",
"ofType": null
},
"isDeprecated": false,
"deprecationReason": null
},
{
"name": "deletedJobs",
"description": "The number of matching jobs deleted",
"args": [
],
"type": {
"kind": "SCALAR",
"name": "Int",
"ofType": null
},
"isDeprecated": false,
"deprecationReason": null
},
{
"name": "queueSize",
"description": "The queue size after processing",
"args": [
],
"type": {
"kind": "SCALAR",
"name": "Int",
"ofType": null
},
"isDeprecated": false,
"deprecationReason": null
}
],
"inputFields": null,
"interfaces": [
],
"enumValues": null,
"possibleTypes": null
},
{
"kind": "INPUT_OBJECT",
"name": "AdminSidekiqQueuesDeleteJobsInput",
"description": "Autogenerated input type of AdminSidekiqQueuesDeleteJobs",
"fields": null,
"inputFields": [
{
"name": "user",
"description": "Delete jobs matching user in the context metadata",
"type": {
"kind": "SCALAR",
"name": "String",
"ofType": null
},
"defaultValue": null
},
{
"name": "project",
"description": "Delete jobs matching project in the context metadata",
"type": {
"kind": "SCALAR",
"name": "String",
"ofType": null
},
"defaultValue": null
},
{
"name": "rootNamespace",
"description": "Delete jobs matching root_namespace in the context metadata",
"type": {
"kind": "SCALAR",
"name": "String",
"ofType": null
},
"defaultValue": null
},
{
"name": "subscriptionPlan",
"description": "Delete jobs matching subscription_plan in the context metadata",
"type": {
"kind": "SCALAR",
"name": "String",
"ofType": null
},
"defaultValue": null
},
{
"name": "callerId",
"description": "Delete jobs matching caller_id in the context metadata",
"type": {
"kind": "SCALAR",
"name": "String",
"ofType": null
},
"defaultValue": null
},
{
"name": "queueName",
"description": "The name of the queue to delete jobs from",
"type": {
"kind": "NON_NULL",
"name": null,
"ofType": {
"kind": "SCALAR",
"name": "String",
"ofType": null
}
},
"defaultValue": null
},
{
"name": "clientMutationId",
"description": "A unique identifier for the client performing the mutation.",
"type": {
"kind": "SCALAR",
"name": "String",
"ofType": null
},
"defaultValue": null
}
],
"interfaces": null,
"enumValues": null,
"possibleTypes": null
},
{ {
"kind": "OBJECT", "kind": "OBJECT",
"name": "AddAwardEmojiPayload", "name": "AddAwardEmojiPayload",
......
...@@ -26,6 +26,16 @@ Autogenerated return type of AddAwardEmoji ...@@ -26,6 +26,16 @@ Autogenerated return type of AddAwardEmoji
| `clientMutationId` | String | A unique identifier for the client performing the mutation. | | `clientMutationId` | String | A unique identifier for the client performing the mutation. |
| `errors` | String! => Array | Reasons why the mutation failed. | | `errors` | String! => Array | Reasons why the mutation failed. |
## AdminSidekiqQueuesDeleteJobsPayload
Autogenerated return type of AdminSidekiqQueuesDeleteJobs
| Name | Type | Description |
| --- | ---- | ---------- |
| `clientMutationId` | String | A unique identifier for the client performing the mutation. |
| `errors` | String! => Array | Reasons why the mutation failed. |
| `result` | DeleteJobsResponse | Information about the status of the deletion request |
## AwardEmoji ## AwardEmoji
An emoji awarded by a user. An emoji awarded by a user.
...@@ -129,6 +139,16 @@ Autogenerated return type of CreateSnippet ...@@ -129,6 +139,16 @@ Autogenerated return type of CreateSnippet
| `errors` | String! => Array | Reasons why the mutation failed. | | `errors` | String! => Array | Reasons why the mutation failed. |
| `snippet` | Snippet | The snippet after mutation | | `snippet` | Snippet | The snippet after mutation |
## DeleteJobsResponse
The response from the AdminSidekiqQueuesDeleteJobs mutation.
| Name | Type | Description |
| --- | ---- | ---------- |
| `completed` | Boolean | Whether or not the entire queue was processed in time; if not, retrying the same request is safe |
| `deletedJobs` | Int | The number of matching jobs deleted |
| `queueSize` | Int | The queue size after processing |
## Design ## Design
A single design A single design
......
# frozen_string_literal: true
module API
module Admin
class Sidekiq < Grape::API
before { authenticated_as_admin! }
namespace 'admin' do
namespace 'sidekiq' do
namespace 'queues' do
desc 'Drop jobs matching the given metadata from the Sidekiq queue'
params do
Labkit::Context::KNOWN_KEYS.each do |key|
optional key, type: String, allow_blank: false
end
at_least_one_of(*Labkit::Context::KNOWN_KEYS)
end
delete ':queue_name' do
result =
Gitlab::SidekiqQueue
.new(params[:queue_name])
.drop_jobs!(declared_params, timeout: 30)
present result
rescue Gitlab::SidekiqQueue::NoMetadataError
render_api_error!("Invalid metadata: #{declared_params}", 400)
rescue Gitlab::SidekiqQueue::InvalidQueueError
not_found!(params[:queue_name])
end
end
end
end
end
end
end
...@@ -110,6 +110,7 @@ module API ...@@ -110,6 +110,7 @@ module API
# Keep in alphabetical order # Keep in alphabetical order
mount ::API::AccessRequests mount ::API::AccessRequests
mount ::API::Admin::Sidekiq
mount ::API::Appearance mount ::API::Appearance
mount ::API::Applications mount ::API::Applications
mount ::API::Avatar mount ::API::Avatar
......
# frozen_string_literal: true
module Gitlab
class SidekiqQueue
include Gitlab::Utils::StrongMemoize
NoMetadataError = Class.new(StandardError)
InvalidQueueError = Class.new(StandardError)
attr_reader :queue_name
def initialize(queue_name)
@queue_name = queue_name
end
def drop_jobs!(search_metadata, timeout:)
completed = false
deleted_jobs = 0
job_search_metadata =
search_metadata
.stringify_keys
.slice(*Labkit::Context::KNOWN_KEYS)
.transform_keys { |key| "meta.#{key}" }
.compact
raise NoMetadataError if job_search_metadata.empty?
raise InvalidQueueError unless queue
begin
Timeout.timeout(timeout) do
queue.each do |job|
next unless job_matches?(job, job_search_metadata)
job.delete
deleted_jobs += 1
end
completed = true
end
rescue Timeout::Error
end
{
completed: completed,
deleted_jobs: deleted_jobs,
queue_size: queue.size
}
end
def queue
strong_memoize(:queue) do
# Sidekiq::Queue.new always returns a queue, even if it doesn't
# exist.
Sidekiq::Queue.all.find { |queue| queue.name == queue_name }
end
end
def job_matches?(job, job_search_metadata)
job_search_metadata.all? { |key, value| job[key] == value }
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::SidekiqQueue do
around do |example|
Sidekiq::Queue.new('authorized_projects').clear
Sidekiq::Testing.disable!(&example)
Sidekiq::Queue.new('authorized_projects').clear
end
def add_job(user)
Sidekiq::Client.push(
'class' => 'AuthorizedProjectsWorker',
'queue' => 'authorized_projects',
'args' => [user.id],
'meta.user' => user.username
)
end
describe '#drop_jobs!' do
shared_examples 'queue processing' do
let(:sidekiq_queue) { described_class.new('authorized_projects') }
let_it_be(:sidekiq_queue_user) { create(:user) }
before do
add_job(create(:user))
add_job(sidekiq_queue_user)
add_job(sidekiq_queue_user)
end
context 'when the queue is not processed in time' do
before do
calls = 0
allow(sidekiq_queue).to receive(:job_matches?).and_wrap_original do |m, *args|
raise Timeout::Error if calls > 0
calls += 1
m.call(*args)
end
end
it 'returns a non-completion flag, the number of jobs deleted, and the remaining queue size' do
expect(sidekiq_queue.drop_jobs!(search_metadata, timeout: 10))
.to eq(completed: false,
deleted_jobs: timeout_deleted,
queue_size: 3 - timeout_deleted)
end
end
context 'when the queue is processed in time' do
it 'returns a completion flag, the number of jobs deleted, and the remaining queue size' do
expect(sidekiq_queue.drop_jobs!(search_metadata, timeout: 10))
.to eq(completed: true,
deleted_jobs: no_timeout_deleted,
queue_size: 3 - no_timeout_deleted)
end
end
end
context 'when there are no matching jobs' do
include_examples 'queue processing' do
let(:search_metadata) { { project: 1 } }
let(:timeout_deleted) { 0 }
let(:no_timeout_deleted) { 0 }
end
end
context 'when there are matching jobs' do
include_examples 'queue processing' do
let(:search_metadata) { { user: sidekiq_queue_user.username } }
let(:timeout_deleted) { 1 }
let(:no_timeout_deleted) { 2 }
end
end
context 'when there are no valid metadata keys passed' do
it 'raises NoMetadataError' do
add_job(create(:user))
expect { described_class.new('authorized_projects').drop_jobs!({ username: 'sidekiq_queue_user' }, timeout: 1) }
.to raise_error(described_class::NoMetadataError)
end
end
context 'when the queue does not exist' do
it 'raises InvalidQueueError' do
expect { described_class.new('foo').drop_jobs!({ user: 'sidekiq_queue_user' }, timeout: 1) }
.to raise_error(described_class::InvalidQueueError)
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe API::Admin::Sidekiq do
let_it_be(:admin) { create(:admin) }
describe 'DELETE /admin/sidekiq/queues/:queue_name' do
context 'when the user is not an admin' do
it 'returns a 403' do
delete api("/admin/sidekiq/queues/authorized_projects?user=#{admin.username}", create(:user))
expect(response).to have_gitlab_http_status(:forbidden)
end
end
context 'when the user is an admin' do
around do |example|
Sidekiq::Queue.new('authorized_projects').clear
Sidekiq::Testing.disable!(&example)
Sidekiq::Queue.new('authorized_projects').clear
end
def add_job(user)
Sidekiq::Client.push(
'class' => 'AuthorizedProjectsWorker',
'queue' => 'authorized_projects',
'args' => [user.id],
'meta.user' => user.username
)
end
context 'valid request' do
it 'returns info about the deleted jobs' do
add_job(admin)
add_job(admin)
add_job(create(:user))
delete api("/admin/sidekiq/queues/authorized_projects?user=#{admin.username}", admin)
expect(response).to have_gitlab_http_status(:ok)
expect(json_response).to eq('completed' => true,
'deleted_jobs' => 2,
'queue_size' => 1)
end
end
context 'when no required params are provided' do
it 'returns a 400' do
delete api("/admin/sidekiq/queues/authorized_projects?user_2=#{admin.username}", admin)
expect(response).to have_gitlab_http_status(:bad_request)
end
end
context 'when the queue does not exist' do
it 'returns a 404' do
delete api("/admin/sidekiq/queues/authorized_projects_2?user=#{admin.username}", admin)
expect(response).to have_gitlab_http_status(:not_found)
end
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe 'Deleting Sidekiq jobs' do
include GraphqlHelpers
let_it_be(:admin) { create(:admin) }
let(:variables) { { user: admin.username, queue_name: 'authorized_projects' } }
let(:mutation) { graphql_mutation(:admin_sidekiq_queues_delete_jobs, variables) }
def mutation_response
graphql_mutation_response(:admin_sidekiq_queues_delete_jobs)
end
context 'when the user is not an admin' do
let(:current_user) { create(:user) }
it_behaves_like 'a mutation that returns top-level errors',
errors: ['You must be an admin to use this mutation']
end
context 'when the user is an admin' do
let(:current_user) { admin }
context 'valid request' do
around do |example|
Sidekiq::Queue.new('authorized_projects').clear
Sidekiq::Testing.disable!(&example)
Sidekiq::Queue.new('authorized_projects').clear
end
def add_job(user)
Sidekiq::Client.push(
'class' => 'AuthorizedProjectsWorker',
'queue' => 'authorized_projects',
'args' => [user.id],
'meta.user' => user.username
)
end
it 'returns info about the deleted jobs' do
add_job(admin)
add_job(admin)
add_job(create(:user))
post_graphql_mutation(mutation, current_user: admin)
expect(mutation_response['errors']).to be_empty
expect(mutation_response['result']).to eq('completed' => true,
'deletedJobs' => 2,
'queueSize' => 1)
end
end
context 'when no required params are provided' do
let(:variables) { { queue_name: 'authorized_projects' } }
it_behaves_like 'a mutation that returns errors in the response',
errors: ['No metadata provided']
end
context 'when the queue does not exist' do
let(:variables) { { user: admin.username, queue_name: 'authorized_projects_2' } }
it_behaves_like 'a mutation that returns top-level errors',
errors: ['Queue authorized_projects_2 not found']
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment