Commit 544b1ea6 authored by Illya Klymov's avatar Illya Klymov

Implement import status endpoint polling

- use fresh added endpoint for status
parent 77dfaba7
...@@ -25,6 +25,14 @@ export function createResolvers({ endpoints }) { ...@@ -25,6 +25,14 @@ export function createResolvers({ endpoints }) {
data: { availableNamespaces }, data: { availableNamespaces },
} = await client.query({ query: availableNamespacesQuery }); } = await client.query({ query: availableNamespacesQuery });
if (!statusPoller) {
statusPoller = new StatusPoller({
client,
pollPath: endpoints.jobs,
});
statusPoller.startPolling();
}
return axios return axios
.get(endpoints.status, { .get(endpoints.status, {
params: { params: {
...@@ -83,7 +91,7 @@ export function createResolvers({ endpoints }) { ...@@ -83,7 +91,7 @@ export function createResolvers({ endpoints }) {
const group = groupManager.findById(sourceGroupId); const group = groupManager.findById(sourceGroupId);
groupManager.setImportStatus(group, STATUSES.SCHEDULING); groupManager.setImportStatus(group, STATUSES.SCHEDULING);
try { try {
await axios.post(endpoints.createBulkImport, { const response = await axios.post(endpoints.createBulkImport, {
bulk_import: [ bulk_import: [
{ {
source_type: 'group_entity', source_type: 'group_entity',
...@@ -94,10 +102,7 @@ export function createResolvers({ endpoints }) { ...@@ -94,10 +102,7 @@ export function createResolvers({ endpoints }) {
], ],
}); });
groupManager.setImportStatus(group, STATUSES.STARTED); groupManager.setImportStatus(group, STATUSES.STARTED);
if (!statusPoller) { SourceGroupsManager.attachImportId(group, response.data.id);
statusPoller = new StatusPoller({ client, interval: 3000 });
statusPoller.startPolling();
}
} catch (e) { } catch (e) {
createFlash({ createFlash({
message: s__('BulkImport|Importing the group failed'), message: s__('BulkImport|Importing the group failed'),
......
...@@ -14,6 +14,12 @@ function generateGroupId(id) { ...@@ -14,6 +14,12 @@ function generateGroupId(id) {
} }
export class SourceGroupsManager { export class SourceGroupsManager {
static importMap = new Map();
static attachImportId(group, importId) {
SourceGroupsManager.importMap.set(importId, group.id);
}
constructor({ client }) { constructor({ client }) {
this.client = client; this.client = client;
} }
...@@ -36,6 +42,10 @@ export class SourceGroupsManager { ...@@ -36,6 +42,10 @@ export class SourceGroupsManager {
this.update(group, fn); this.update(group, fn);
} }
findByImportId(importId) {
return this.findById(SourceGroupsManager.importMap.get(importId));
}
setImportStatus(group, status) { setImportStatus(group, status) {
this.update(group, (sourceGroup) => { this.update(group, (sourceGroup) => {
// eslint-disable-next-line no-param-reassign // eslint-disable-next-line no-param-reassign
......
import gql from 'graphql-tag'; import Visibility from 'visibilityjs';
import createFlash from '~/flash'; import createFlash from '~/flash';
import axios from '~/lib/utils/axios_utils';
import Poll from '~/lib/utils/poll';
import { s__ } from '~/locale'; import { s__ } from '~/locale';
import { STATUSES } from '../../../constants';
import bulkImportSourceGroupsQuery from '../queries/bulk_import_source_groups.query.graphql';
import { SourceGroupsManager } from './source_groups_manager'; import { SourceGroupsManager } from './source_groups_manager';
const groupId = (i) => `group${i}`;
function generateGroupsQuery(groups) {
return gql`{
${groups
.map(
(g, idx) =>
`${groupId(idx)}: group(fullPath: "${g.import_target.target_namespace}/${
g.import_target.new_name
}") { id }`,
)
.join('\n')}
}`;
}
export class StatusPoller { export class StatusPoller {
constructor({ client, interval }) { constructor({ client, pollPath }) {
this.client = client; this.client = client;
this.interval = interval;
this.timeoutId = null;
this.groupManager = new SourceGroupsManager({ client });
}
startPolling() { this.eTagPoll = new Poll({
if (this.timeoutId) { resource: {
return; fetchJobs: () => axios.get(pollPath),
} },
method: 'fetchJobs',
successCallback: ({ data }) => this.updateImportsStatuses(data),
errorCallback: () =>
createFlash({
message: s__('BulkImport|Update of import statuses with realtime changes failed'),
}),
});
Visibility.change(() => {
if (!Visibility.hidden()) {
this.eTagPoll.restart();
} else {
this.eTagPoll.stop();
}
});
this.checkPendingImports(); this.groupManager = new SourceGroupsManager({ client });
} }
stopPolling() { startPolling() {
clearTimeout(this.timeoutId); this.eTagPoll.makeRequest();
this.timeoutId = null;
} }
async checkPendingImports() { async updateImportsStatuses(importStatuses) {
try { importStatuses.forEach(({ id, status_name: statusName }) => {
const { bulkImportSourceGroups } = this.client.readQuery({ const group = this.groupManager.findByImportId(id);
query: bulkImportSourceGroupsQuery, if (group.id) {
}); this.groupManager.setImportStatus(group, statusName);
const groupsInProgress = bulkImportSourceGroups.nodes.filter(
(g) => g.status === STATUSES.STARTED,
);
if (groupsInProgress.length) {
const { data: results } = await this.client.query({
query: generateGroupsQuery(groupsInProgress),
fetchPolicy: 'no-cache',
});
const completedGroups = groupsInProgress.filter((_, idx) => Boolean(results[groupId(idx)]));
completedGroups.forEach((group) => {
this.groupManager.setImportStatus(group, STATUSES.FINISHED);
});
} }
} catch (e) { });
createFlash({
message: s__('BulkImport|Update of import statuses with realtime changes failed'),
});
} finally {
this.timeoutId = setTimeout(() => this.checkPendingImports(), this.interval);
}
} }
} }
...@@ -14,6 +14,7 @@ export function mountImportGroupsApp(mountElement) { ...@@ -14,6 +14,7 @@ export function mountImportGroupsApp(mountElement) {
statusPath, statusPath,
availableNamespacesPath, availableNamespacesPath,
createBulkImportPath, createBulkImportPath,
jobsPath,
sourceUrl, sourceUrl,
} = mountElement.dataset; } = mountElement.dataset;
const apolloProvider = new VueApollo({ const apolloProvider = new VueApollo({
...@@ -22,6 +23,7 @@ export function mountImportGroupsApp(mountElement) { ...@@ -22,6 +23,7 @@ export function mountImportGroupsApp(mountElement) {
status: statusPath, status: statusPath,
availableNamespaces: availableNamespacesPath, availableNamespaces: availableNamespacesPath,
createBulkImport: createBulkImportPath, createBulkImport: createBulkImportPath,
jobs: jobsPath,
}, },
}), }),
}); });
......
...@@ -37,9 +37,8 @@ class Import::BulkImportsController < ApplicationController ...@@ -37,9 +37,8 @@ class Import::BulkImportsController < ApplicationController
end end
def create def create
BulkImportService.new(current_user, create_params, credentials).execute result = BulkImportService.new(current_user, create_params, credentials).execute
render json: result.to_json(only: [:id])
render json: :ok
end end
def realtime_changes def realtime_changes
......
...@@ -38,6 +38,8 @@ class BulkImportService ...@@ -38,6 +38,8 @@ class BulkImportService
bulk_import = create_bulk_import bulk_import = create_bulk_import
BulkImportWorker.perform_async(bulk_import.id) BulkImportWorker.perform_async(bulk_import.id)
bulk_import
end end
private private
......
...@@ -8,4 +8,5 @@ ...@@ -8,4 +8,5 @@
#import-groups-mount-element{ data: { status_path: status_import_bulk_imports_path(format: :json), #import-groups-mount-element{ data: { status_path: status_import_bulk_imports_path(format: :json),
available_namespaces_path: import_available_namespaces_path(format: :json), available_namespaces_path: import_available_namespaces_path(format: :json),
create_bulk_import_path: import_bulk_imports_path(format: :json), create_bulk_import_path: import_bulk_imports_path(format: :json),
jobs_path: realtime_changes_import_bulk_imports_path(format: :json),
source_url: @source_url } } source_url: @source_url } }
---
title: Use realtime_changes endpoint for reporting group import status
merge_request: 52796
author:
type: changed
...@@ -185,6 +185,7 @@ RSpec.describe Import::BulkImportsController do ...@@ -185,6 +185,7 @@ RSpec.describe Import::BulkImportsController do
describe 'POST create' do describe 'POST create' do
let(:instance_url) { "http://fake-intance" } let(:instance_url) { "http://fake-intance" }
let(:bulk_import) { create(:bulk_import) }
let(:pat) { "fake-pat" } let(:pat) { "fake-pat" }
before do before do
...@@ -201,12 +202,13 @@ RSpec.describe Import::BulkImportsController do ...@@ -201,12 +202,13 @@ RSpec.describe Import::BulkImportsController do
expect_next_instance_of( expect_next_instance_of(
BulkImportService, user, bulk_import_params, { url: instance_url, access_token: pat }) do |service| BulkImportService, user, bulk_import_params, { url: instance_url, access_token: pat }) do |service|
expect(service).to receive(:execute) allow(service).to receive(:execute).and_return(bulk_import)
end end
post :create, params: { bulk_import: bulk_import_params } post :create, params: { bulk_import: bulk_import_params }
expect(response).to have_gitlab_http_status(:ok) expect(response).to have_gitlab_http_status(:ok)
expect(response.body).to eq({ id: bulk_import.id }.to_json)
end end
end end
end end
......
...@@ -28,6 +28,7 @@ const FAKE_ENDPOINTS = { ...@@ -28,6 +28,7 @@ const FAKE_ENDPOINTS = {
status: '/fake_status_url', status: '/fake_status_url',
availableNamespaces: '/fake_available_namespaces', availableNamespaces: '/fake_available_namespaces',
createBulkImport: '/fake_create_bulk_import', createBulkImport: '/fake_create_bulk_import',
jobs: '/fake_jobs',
}; };
describe('Bulk import resolvers', () => { describe('Bulk import resolvers', () => {
...@@ -109,6 +110,11 @@ describe('Bulk import resolvers', () => { ...@@ -109,6 +110,11 @@ describe('Bulk import resolvers', () => {
), ),
).toBe(true); ).toBe(true);
}); });
it('starts polling when request completes', async () => {
const [statusPoller] = StatusPoller.mock.instances;
expect(statusPoller.startPolling).toHaveBeenCalled();
});
}); });
it.each` it.each`
...@@ -215,7 +221,7 @@ describe('Bulk import resolvers', () => { ...@@ -215,7 +221,7 @@ describe('Bulk import resolvers', () => {
}); });
it('sets group status to STARTED when request completes', async () => { it('sets group status to STARTED when request completes', async () => {
axiosMockAdapter.onPost(FAKE_ENDPOINTS.createBulkImport).reply(httpStatus.OK); axiosMockAdapter.onPost(FAKE_ENDPOINTS.createBulkImport).reply(httpStatus.OK, { id: 1 });
await client.mutate({ await client.mutate({
mutation: importGroupMutation, mutation: importGroupMutation,
variables: { sourceGroupId: GROUP_ID }, variables: { sourceGroupId: GROUP_ID },
...@@ -224,16 +230,6 @@ describe('Bulk import resolvers', () => { ...@@ -224,16 +230,6 @@ describe('Bulk import resolvers', () => {
expect(results[0].status).toBe(STATUSES.STARTED); expect(results[0].status).toBe(STATUSES.STARTED);
}); });
it('starts polling when request completes', async () => {
axiosMockAdapter.onPost(FAKE_ENDPOINTS.createBulkImport).reply(httpStatus.OK);
await client.mutate({
mutation: importGroupMutation,
variables: { sourceGroupId: GROUP_ID },
});
const [statusPoller] = StatusPoller.mock.instances;
expect(statusPoller.startPolling).toHaveBeenCalled();
});
it('resets status to NONE if request fails', async () => { it('resets status to NONE if request fails', async () => {
axiosMockAdapter axiosMockAdapter
.onPost(FAKE_ENDPOINTS.createBulkImport) .onPost(FAKE_ENDPOINTS.createBulkImport)
......
import { InMemoryCache } from 'apollo-cache-inmemory'; import MockAdapter from 'axios-mock-adapter';
import { createMockClient } from 'mock-apollo-client'; import Visibility from 'visibilityjs';
import waitForPromises from 'helpers/wait_for_promises';
import createFlash from '~/flash'; import createFlash from '~/flash';
import { STATUSES } from '~/import_entities/constants'; import { STATUSES } from '~/import_entities/constants';
import { clientTypenames } from '~/import_entities/import_groups/graphql/client_factory';
import bulkImportSourceGroupsQuery from '~/import_entities/import_groups/graphql/queries/bulk_import_source_groups.query.graphql';
import { SourceGroupsManager } from '~/import_entities/import_groups/graphql/services/source_groups_manager'; import { SourceGroupsManager } from '~/import_entities/import_groups/graphql/services/source_groups_manager';
import { StatusPoller } from '~/import_entities/import_groups/graphql/services/status_poller'; import { StatusPoller } from '~/import_entities/import_groups/graphql/services/status_poller';
import { generateFakeEntry } from '../fixtures'; import axios from '~/lib/utils/axios_utils';
import Poll from '~/lib/utils/poll';
jest.mock('visibilityjs');
jest.mock('~/flash'); jest.mock('~/flash');
jest.mock('~/lib/utils/poll');
jest.mock('~/import_entities/import_groups/graphql/services/source_groups_manager', () => ({ jest.mock('~/import_entities/import_groups/graphql/services/source_groups_manager', () => ({
SourceGroupsManager: jest.fn().mockImplementation(function mock() { SourceGroupsManager: jest.fn().mockImplementation(function mock() {
this.setImportStatus = jest.fn(); this.setImportStatus = jest.fn();
this.findByImportId = jest.fn();
}), }),
})); }));
const TEST_POLL_INTERVAL = 1000; const FAKE_POLL_PATH = '/fake/poll/path';
const FAKE_PAGE_INFO = { page: 1, perPage: 20, total: 40, totalPages: 2 }; const CLIENT_MOCK = {};
describe('Bulk import status poller', () => { describe('Bulk import status poller', () => {
let poller; let poller;
let clientMock; let mockAdapter;
const listQueryCacheCalls = () => const getPollHistory = () => mockAdapter.history.get.filter((x) => x.url === FAKE_POLL_PATH);
clientMock.readQuery.mock.calls.filter((call) => call[0].query === bulkImportSourceGroupsQuery);
const generateFakeGroups = (statuses) =>
statuses.map((status, idx) => generateFakeEntry({ status, id: idx }));
const writeFakeGroupsQuery = (nodes) => {
clientMock.cache.writeQuery({
query: bulkImportSourceGroupsQuery,
data: {
bulkImportSourceGroups: {
__typename: clientTypenames.BulkImportSourceGroupConnection,
nodes,
pageInfo: {
__typename: clientTypenames.BulkImportPageInfo,
...FAKE_PAGE_INFO,
},
},
},
});
};
beforeEach(() => { beforeEach(() => {
clientMock = createMockClient({ mockAdapter = new MockAdapter(axios);
cache: new InMemoryCache({ mockAdapter.onGet(FAKE_POLL_PATH).reply(200, {});
fragmentMatcher: { match: () => true }, poller = new StatusPoller({ client: CLIENT_MOCK, pollPath: FAKE_POLL_PATH });
}), });
});
it('creates source group manager with proper client', () => {
jest.spyOn(clientMock, 'readQuery'); expect(SourceGroupsManager.mock.calls).toHaveLength(1);
const [[{ client }]] = SourceGroupsManager.mock.calls;
poller = new StatusPoller({ expect(client).toBe(CLIENT_MOCK);
client: clientMock, });
interval: TEST_POLL_INTERVAL,
}); it('creates poller with proper config', () => {
expect(Poll.mock.calls).toHaveLength(1);
const [[pollConfig]] = Poll.mock.calls;
expect(typeof pollConfig.method).toBe('string');
const pollOperation = pollConfig.resource[pollConfig.method];
expect(typeof pollOperation).toBe('function');
});
it('invokes axios when polling is performed', async () => {
const [[pollConfig]] = Poll.mock.calls;
const pollOperation = pollConfig.resource[pollConfig.method];
expect(getPollHistory()).toHaveLength(0);
pollOperation();
await axios.waitForAll();
expect(getPollHistory()).toHaveLength(1);
}); });
describe('general behavior', () => { it('subscribes to visibility changes', () => {
beforeEach(() => { expect(Visibility.change).toHaveBeenCalled();
writeFakeGroupsQuery([]);
});
it('does not perform polling when constructed', () => {
jest.runOnlyPendingTimers();
expect(listQueryCacheCalls()).toHaveLength(0);
});
it('immediately start polling when requested', async () => {
await poller.startPolling();
expect(listQueryCacheCalls()).toHaveLength(1);
});
it('constantly polls when started', async () => {
poller.startPolling();
expect(listQueryCacheCalls()).toHaveLength(1);
jest.advanceTimersByTime(TEST_POLL_INTERVAL);
expect(listQueryCacheCalls()).toHaveLength(2);
jest.advanceTimersByTime(TEST_POLL_INTERVAL);
expect(listQueryCacheCalls()).toHaveLength(3);
});
it('does not start polling when requested multiple times', async () => {
poller.startPolling();
expect(listQueryCacheCalls()).toHaveLength(1);
poller.startPolling();
expect(listQueryCacheCalls()).toHaveLength(1);
});
it('stops polling when requested', async () => {
poller.startPolling();
expect(listQueryCacheCalls()).toHaveLength(1);
poller.stopPolling();
jest.runOnlyPendingTimers();
expect(listQueryCacheCalls()).toHaveLength(1);
});
it('does not query server when list is empty', async () => {
jest.spyOn(clientMock, 'query');
poller.startPolling();
expect(clientMock.query).not.toHaveBeenCalled();
});
}); });
it('does not query server when no groups have STARTED status', async () => { it.each`
writeFakeGroupsQuery(generateFakeGroups([STATUSES.NONE, STATUSES.FINISHED])); isHidden | action
${true} | ${'stop'}
${false} | ${'restart'}
`('$action polling when hidden is $isHidden', ({ action, isHidden }) => {
const [pollInstance] = Poll.mock.instances;
const [[changeHandler]] = Visibility.change.mock.calls;
Visibility.hidden.mockReturnValue(isHidden);
expect(pollInstance[action]).not.toHaveBeenCalled();
changeHandler();
expect(pollInstance[action]).toHaveBeenCalled();
});
it('does not perform polling when constructed', async () => {
await axios.waitForAll();
expect(getPollHistory()).toHaveLength(0);
});
it('immediately start polling when requested', async () => {
const [pollInstance] = Poll.mock.instances;
jest.spyOn(clientMock, 'query');
poller.startPolling(); poller.startPolling();
expect(clientMock.query).not.toHaveBeenCalled();
expect(pollInstance.makeRequest).toHaveBeenCalled();
});
it('when error occurs shows flash with error', () => {
const [[pollConfig]] = Poll.mock.calls;
pollConfig.errorCallback();
expect(createFlash).toHaveBeenCalled();
}); });
describe('when there are groups which have STARTED status', () => { it('when success response arrives updates relevant group status', () => {
const TARGET_NAMESPACE = 'root'; const FAKE_ID = 5;
const [[pollConfig]] = Poll.mock.calls;
const STARTED_GROUP_1 = generateFakeEntry({ const [managerInstance] = SourceGroupsManager.mock.instances;
status: STATUSES.STARTED, managerInstance.findByImportId.mockReturnValue({ id: FAKE_ID });
id: 'started1',
}); pollConfig.successCallback({ data: [{ id: FAKE_ID, status_name: STATUSES.FINISHED }] });
const STARTED_GROUP_2 = generateFakeEntry({ expect(managerInstance.setImportStatus).toHaveBeenCalledWith(
status: STATUSES.STARTED, expect.objectContaining({ id: FAKE_ID }),
id: 'started2', STATUSES.FINISHED,
}); );
const NOT_STARTED_GROUP = generateFakeEntry({
status: STATUSES.NONE,
id: 'not_started',
});
it('query server only for groups with STATUSES.STARTED', async () => {
writeFakeGroupsQuery([STARTED_GROUP_1, NOT_STARTED_GROUP, STARTED_GROUP_2]);
clientMock.query = jest.fn().mockResolvedValue({ data: {} });
poller.startPolling();
expect(clientMock.query).toHaveBeenCalledTimes(1);
await waitForPromises();
const [[doc]] = clientMock.query.mock.calls;
const { selections } = doc.query.definitions[0].selectionSet;
expect(selections.every((field) => field.name.value === 'group')).toBeTruthy();
expect(selections).toHaveLength(2);
expect(selections.map((sel) => sel.arguments[0].value.value)).toStrictEqual([
`${TARGET_NAMESPACE}/${STARTED_GROUP_1.import_target.new_name}`,
`${TARGET_NAMESPACE}/${STARTED_GROUP_2.import_target.new_name}`,
]);
});
it('updates statuses only for groups in response', async () => {
writeFakeGroupsQuery([STARTED_GROUP_1, STARTED_GROUP_2]);
clientMock.query = jest.fn().mockResolvedValue({ data: { group0: {} } });
poller.startPolling();
await waitForPromises();
const [managerInstance] = SourceGroupsManager.mock.instances;
expect(managerInstance.setImportStatus).toHaveBeenCalledTimes(1);
expect(managerInstance.setImportStatus).toHaveBeenCalledWith(
expect.objectContaining({ id: STARTED_GROUP_1.id }),
STATUSES.FINISHED,
);
});
describe('when error occurs', () => {
beforeEach(() => {
writeFakeGroupsQuery([STARTED_GROUP_1, STARTED_GROUP_2]);
clientMock.query = jest.fn().mockRejectedValue(new Error('dummy error'));
poller.startPolling();
return waitForPromises();
});
it('reports an error', () => {
expect(createFlash).toHaveBeenCalled();
});
it('continues polling', async () => {
jest.advanceTimersByTime(TEST_POLL_INTERVAL);
expect(listQueryCacheCalls()).toHaveLength(2);
});
});
}); });
}); });
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment