Commit 6a79b349 authored by Sergei Golubchik's avatar Sergei Golubchik

mhnsw: return an error if lazy neighbor read failed

parent bcbbf333
......@@ -74,6 +74,7 @@ class MHNSW_Context
size_t vec_len= 0;
size_t byte_len= 0;
FVector *target= 0;
uint err= 0;
Hash_set<FVectorNode> node_cache{PSI_INSTRUMENT_MEM, FVectorNode::get_key};
......@@ -142,8 +143,8 @@ float FVectorNode::distance_to(const FVector &other) const
int FVectorNode::instantiate_vector()
{
DBUG_ASSERT(vec == nullptr);
if (int err= ctx->table->file->ha_rnd_pos(ctx->table->record[0], ref))
return err;
if ((ctx->err= ctx->table->file->ha_rnd_pos(ctx->table->record[0], ref)))
return ctx->err;
String buf, *v= ctx->vec_field->val_str(&buf);
if (unlikely(ctx->byte_len == 0))
{
......@@ -173,13 +174,13 @@ int FVectorNode::instantiate_neighbors(size_t layer)
graph->field[0]->store(layer, false);
graph->field[1]->store_binary(ref, ref_len);
key_copy(key, graph->record[0], graph->key_info, graph->key_info->key_length);
if (int err= graph->file->ha_index_read_map(graph->record[0], key,
HA_WHOLE_KEY, HA_READ_KEY_EXACT))
return err;
if ((ctx->err= graph->file->ha_index_read_map(graph->record[0], key,
HA_WHOLE_KEY, HA_READ_KEY_EXACT)))
return ctx->err;
String strbuf, *str= graph->field[2]->val_str(&strbuf);
if (str->length() % ref_len)
return HA_ERR_CRASHED; // should not happen, corrupted HNSW index
return ctx->err= HA_ERR_CRASHED; // corrupted HNSW index
for (const char *pos= str->ptr(); pos < str->end(); pos+= ref_len)
neighbors[layer].push_back(ctx->get_node(pos), &ctx->root);
......@@ -249,11 +250,15 @@ static int select_neighbors(MHNSW_Context *ctx, size_t layer,
*/
List<FVectorNode> candidates= candidates_unsafe;
List<FVectorNode> &neighbors= target.get_neighbors(layer);
if (ctx->err)
return ctx->err;
neighbors.empty();
if (pq.init(10000, 0, cmp_vec, &target) ||
pq_discard.init(10000, 0, cmp_vec, &target))
return HA_ERR_OUT_OF_MEM;
return ctx->err= HA_ERR_OUT_OF_MEM;
for (const FVectorNode &candidate : candidates)
{
......@@ -293,10 +298,12 @@ static int select_neighbors(MHNSW_Context *ctx, size_t layer,
static int write_neighbors(MHNSW_Context *ctx, size_t layer,
const FVectorNode &source_node)
{
int err;
TABLE *graph= ctx->table->hlindex;
const List<FVectorNode> &new_neighbors= source_node.get_neighbors(layer);
if (ctx->err)
return ctx->err;
size_t total_size= new_neighbors.elements * source_node.get_ref_len();
// Allocate memory for the struct and the flexible array member
......@@ -315,23 +322,23 @@ static int write_neighbors(MHNSW_Context *ctx, size_t layer,
graph->field[2]->store_binary(neighbor_array_bytes, total_size);
if (source_node.is_new())
err= graph->file->ha_write_row(graph->record[0]);
ctx->err= graph->file->ha_write_row(graph->record[0]);
else
{
uchar *key= static_cast<uchar*>(alloca(graph->key_info->key_length));
key_copy(key, graph->record[0], graph->key_info, graph->key_info->key_length);
err= graph->file->ha_index_read_map(graph->record[1], key,
ctx->err= graph->file->ha_index_read_map(graph->record[1], key,
HA_WHOLE_KEY, HA_READ_KEY_EXACT);
if (!err)
if (!ctx->err)
{
err= graph->file->ha_update_row(graph->record[1], graph->record[0]);
if (err == HA_ERR_RECORD_IS_THE_SAME)
err= 0;
ctx->err= graph->file->ha_update_row(graph->record[1], graph->record[0]);
if (ctx->err == HA_ERR_RECORD_IS_THE_SAME)
ctx->err= 0;
}
}
my_safe_afree(neighbor_array_bytes, total_size);
return err;
return ctx->err;
}
......@@ -341,18 +348,19 @@ static int update_second_degree_neighbors(MHNSW_Context *ctx, size_t layer,
{
for (const FVectorNode &neigh: node.get_neighbors(layer))
{
neigh.get_neighbors(layer).push_back(&node, &ctx->root);
if (neigh.get_neighbors(layer).elements > max_neighbors)
List<FVectorNode> &neighneighbors= neigh.get_neighbors(layer);
if (ctx->err)
return ctx->err;
neighneighbors.push_back(&node, &ctx->root);
if (neighneighbors.elements > max_neighbors)
{
if (int err= select_neighbors(ctx, layer, neigh,
neigh.get_neighbors(layer), max_neighbors))
return err;
if (select_neighbors(ctx, layer, neigh, neighneighbors, max_neighbors))
return ctx->err;
}
if (int err= write_neighbors(ctx, layer, neigh))
return err;
if (write_neighbors(ctx, layer, neigh))
return ctx->err;
}
return 0;
return ctx->err;
}
......@@ -360,8 +368,8 @@ static int update_neighbors(MHNSW_Context *ctx, size_t layer,
uint max_neighbors, const FVectorNode &node)
{
// 1. update node's neighbors
if (int err= write_neighbors(ctx, layer, node))
return err;
if (write_neighbors(ctx, layer, node))
return ctx->err;
// 2. update node's neighbors' neighbors (shrink before update)
return update_second_degree_neighbors(ctx, layer, max_neighbors, node);
}
......@@ -428,7 +436,7 @@ static int search_layer(MHNSW_Context *ctx,
while (best.elements())
result->push_front(best.pop(), &ctx->root);
return 0;
return ctx->err;
}
......@@ -482,10 +490,11 @@ int mhnsw_insert(TABLE *table, KEY *keyinfo)
SCOPE_EXIT([graph](){ graph->file->ha_index_end(); });
if (int err= graph->file->ha_index_last(graph->record[0]))
if ((ctx.err= graph->file->ha_index_last(graph->record[0])))
{
if (err != HA_ERR_END_OF_FILE)
return err;
if (ctx.err != HA_ERR_END_OF_FILE)
return ctx.err;
ctx.err= 0;
// First insert!
FVectorNode target(&ctx, table->file->ref);
......@@ -519,16 +528,16 @@ int mhnsw_insert(TABLE *table, KEY *keyinfo)
if (new_node_layer > max_layer)
{
if (int err= write_neighbors(&ctx, max_layer + 1, target))
return err;
if (write_neighbors(&ctx, max_layer + 1, target))
return ctx.err;
new_node_layer= max_layer;
}
else
{
for (longlong cur_layer= max_layer; cur_layer > new_node_layer; cur_layer--)
{
if (int err= search_layer(&ctx, start_nodes, 1, cur_layer, &candidates))
return err;
if (search_layer(&ctx, start_nodes, 1, cur_layer, &candidates))
return ctx.err;
start_nodes= candidates;
candidates.empty();
}
......@@ -539,15 +548,14 @@ int mhnsw_insert(TABLE *table, KEY *keyinfo)
uint max_neighbors= (cur_layer == 0) // heuristics from the paper
? thd->variables.mhnsw_max_edges_per_node * 2
: thd->variables.mhnsw_max_edges_per_node;
if (int err= search_layer(&ctx, start_nodes, ef_construction, cur_layer,
&candidates))
return err;
if (int err= select_neighbors(&ctx, cur_layer, target, candidates,
max_neighbors))
return err;
if (int err= update_neighbors(&ctx, cur_layer, max_neighbors, target))
return err;
if (search_layer(&ctx, start_nodes, ef_construction, cur_layer,
&candidates))
return ctx.err;
if (select_neighbors(&ctx, cur_layer, target, candidates, max_neighbors))
return ctx.err;
if (update_neighbors(&ctx, cur_layer, max_neighbors, target))
return ctx.err;
start_nodes= candidates;
candidates.empty();
}
......@@ -576,8 +584,8 @@ int mhnsw_first(TABLE *table, KEY *keyinfo, Item *dist, ulonglong limit)
SCOPE_EXIT([graph](){ graph->file->ha_index_end(); });
if (int err= graph->file->ha_index_last(graph->record[0]))
return err;
if ((ctx.err= graph->file->ha_index_last(graph->record[0])))
return ctx.err;
longlong max_layer= graph->field[0]->val_int();
......@@ -610,14 +618,14 @@ int mhnsw_first(TABLE *table, KEY *keyinfo, Item *dist, ulonglong limit)
for (size_t cur_layer= max_layer; cur_layer > 0; cur_layer--)
{
if (int err= search_layer(&ctx, start_nodes, 1, cur_layer, &candidates))
return err;
if (search_layer(&ctx, start_nodes, 1, cur_layer, &candidates))
return ctx.err;
start_nodes= candidates;
candidates.empty();
}
if (int err= search_layer(&ctx, start_nodes, ef_search, 0, &candidates))
return err;
if (search_layer(&ctx, start_nodes, ef_search, 0, &candidates))
return ctx.err;
size_t context_size= limit * h->ref_length + sizeof(ulonglong);
char *context= thd->alloc(context_size);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment