Commit b786df7b authored by Alishan Ladhani's avatar Alishan Ladhani

Avoid sending bulk requests larger than bulk_limit_bytes

Send a smaller bulk request if adding another item would
go over bulk limit
parent ccd3b3fe
---
title: Reduce 413 errors when making bulk indexing requests to Elasticsearch
merge_request: 31653
author:
type: fixed
...@@ -35,10 +35,12 @@ module Gitlab ...@@ -35,10 +35,12 @@ module Gitlab
else else
delete(ref) delete(ref)
end end
self
end end
def flush def flush
maybe_send_bulk(force: true).failures send_bulk.failures
end end
private private
...@@ -56,8 +58,6 @@ module Gitlab ...@@ -56,8 +58,6 @@ module Gitlab
op = build_op(ref, proxy) op = build_op(ref, proxy)
submit({ index: op }, proxy.as_indexed_json) submit({ index: op }, proxy.as_indexed_json)
maybe_send_bulk
end end
def delete(ref) def delete(ref)
...@@ -65,8 +65,6 @@ module Gitlab ...@@ -65,8 +65,6 @@ module Gitlab
op = build_op(ref, proxy) op = build_op(ref, proxy)
submit(delete: op) submit(delete: op)
maybe_send_bulk
end end
def build_op(ref, proxy) def build_op(ref, proxy)
...@@ -86,17 +84,27 @@ module Gitlab ...@@ -86,17 +84,27 @@ module Gitlab
end end
def submit(*hashes) def submit(*hashes)
hashes.each do |hash| jsons = hashes.map(&:to_json)
text = hash.to_json bytesize = calculate_bytesize(jsons)
send_bulk if will_exceed_bulk_limit?(bytesize)
body.push(text) body.concat(jsons)
@body_size_bytes += text.bytesize + 2 # Account for newlines @body_size_bytes += bytesize
end end
def calculate_bytesize(jsons)
jsons.reduce(0) do |sum, json|
sum + json.bytesize + 2 # Account for newlines
end
end
def will_exceed_bulk_limit?(bytesize)
body_size_bytes + bytesize > bulk_limit_bytes
end end
def maybe_send_bulk(force: false) def send_bulk
return self if body.empty? return self if body.empty?
return self if body_size_bytes < bulk_limit_bytes && !force
failed_refs = try_send_bulk failed_refs = try_send_bulk
......
...@@ -31,16 +31,31 @@ describe Gitlab::Elastic::BulkIndexer, :elastic do ...@@ -31,16 +31,31 @@ describe Gitlab::Elastic::BulkIndexer, :elastic do
indexer.process(issue_as_ref) indexer.process(issue_as_ref)
end end
it 'sends a bulk request if the max bulk request size is reached' do it 'sends the action and source in the same request' do
set_bulk_limit(indexer, 1) set_bulk_limit(indexer, 1)
indexer.process(issue_as_ref)
allow(es_client).to receive(:bulk).and_return({})
indexer.process(issue_as_ref)
expect(es_client) expect(es_client)
.to receive(:bulk) .to have_received(:bulk)
.with(body: [kind_of(String), kind_of(String)]) .with(body: [kind_of(String), kind_of(String)])
.and_return({}) expect(indexer.failures).to be_empty
end
it 'sends a bulk request before adding an item that exceeds the bulk limit' do
bulk_limit_bytes = (issue_as_json_with_times.to_json.bytesize * 1.5).to_i
set_bulk_limit(indexer, bulk_limit_bytes)
indexer.process(issue_as_ref) indexer.process(issue_as_ref)
allow(es_client).to receive(:bulk).and_return({})
indexer.process(issue_as_ref)
expect(es_client).to have_received(:bulk) do |args|
body_bytesize = args[:body].map(&:bytesize).reduce(:+)
expect(body_bytesize).to be <= bulk_limit_bytes
end
expect(indexer.failures).to be_empty expect(indexer.failures).to be_empty
end end
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment