Commit a81a1175 authored by Lucas Charles's avatar Lucas Charles

Add nginx throughput to WAF Anomaly service

Fetch aggregate stats for display on WAF statistics page to correlate
against WAF anomalies (done in separate MR)

Part of https://gitlab.com/gitlab-org/gitlab/issues/14707
parent fa7387b7
......@@ -4,6 +4,7 @@ module Clusters
module Applications
class Ingress < ApplicationRecord
VERSION = '1.29.7'
INGRESS_CONTAINER_NAME = 'nginx-ingress-controller'
MODSECURITY_LOG_CONTAINER_NAME = 'modsecurity-log'
self.table_name = 'clusters_applications_ingress'
......@@ -69,7 +70,7 @@ module Clusters
end
def ingress_service
cluster.kubeclient.get_service('ingress-nginx-ingress-controller', Gitlab::Kubernetes::Helm::NAMESPACE)
cluster.kubeclient.get_service("ingress-#{INGRESS_CONTAINER_NAME}", Gitlab::Kubernetes::Helm::NAMESPACE)
end
private
......@@ -123,7 +124,7 @@ module Clusters
{
"name" => "modsecurity-template-volume",
"configMap" => {
"name" => "ingress-nginx-ingress-controller",
"name" => "ingress-#{INGRESS_CONTAINER_NAME}",
"items" => [
{
"key" => "modsecurity.conf",
......
---
title: Add nginx request aggregations to WAF anomaly service
merge_request: 25273
author:
type: added
......@@ -15,11 +15,18 @@ module Security
def execute
return if elasticsearch_client.nil?
# Use multi-search with single query as we'll be adding nginx later
# with https://gitlab.com/gitlab-org/gitlab/issues/14707
aggregate_results = elasticsearch_client.msearch(body: body)
nginx_results = aggregate_results['responses'].first
nginx_total_requests = nginx_results.dig('hits', 'total').to_f
{
total_traffic: 0,
total_traffic: nginx_total_requests,
anomalous_traffic: 0.0,
history: {
nominal: [],
nominal: histogram_from(nginx_results),
anomalous: []
},
interval: @interval,
......@@ -32,5 +39,104 @@ module Security
def elasticsearch_client
@client ||= @environment.deployment_platform.cluster.application_elastic_stack&.elasticsearch_client
end
private
def body
[
{ index: indices },
{
query: nginx_requests_query,
aggs: aggregations(@interval),
size: 0 # no docs needed, only counts
}
]
end
# Construct a list of daily indices to be searched. We do this programmatically
# based on the requested timeframe to reduce the load of querying all previous
# indices
def indices
(@from.to_date..@to.to_date).map do |day|
"filebeat-*-#{day.strftime('%Y.%m.%d')}"
end
end
def nginx_requests_query
{
bool: {
must: [
{
range: {
'@timestamp' => {
gte: @from,
lte: @to
}
}
},
{
terms_set: {
message: {
terms: environment_proxy_upstream_name_tokens,
minimum_should_match_script: {
source: 'params.num_terms'
}
}
}
},
{
match_phrase: {
'kubernetes.container.name' => {
query: ::Clusters::Applications::Ingress::INGRESS_CONTAINER_NAME
}
}
},
{
match_phrase: {
'kubernetes.namespace' => {
query: Gitlab::Kubernetes::Helm::NAMESPACE
}
}
},
{
match_phrase: {
stream: {
query: 'stdout'
}
}
}
]
}
}
end
def aggregations(interval)
{
counts: {
date_histogram: {
field: '@timestamp',
interval: interval,
order: {
'_key': 'asc'
}
}
}
}
end
def histogram_from(results)
buckets = results.dig('aggregations', 'counts', 'buckets') || []
buckets.map { |bucket| [bucket['key_as_string'], bucket['doc_count']] }
end
# Derive proxy upstream name to filter nginx log by environment
# See https://kubernetes.github.io/ingress-nginx/user-guide/nginx-configuration/log-format/
def environment_proxy_upstream_name_tokens
[
*@environment.deployment_namespace.split('-'),
@environment.slug # $RELEASE_NAME
]
end
end
end
......@@ -8,6 +8,7 @@ describe Projects::Security::WafAnomaliesController do
let_it_be(:project) { create(:project, :public, :repository, group: group) }
let_it_be(:environment) { create(:environment, :with_review_app, project: project) }
let_it_be(:cluster) { create(:cluster, :provided_by_gcp, environment_scope: '*', projects: [environment.project]) }
let_it_be(:action_params) { { project_id: project, namespace_id: project.namespace, environment_id: environment } }
......@@ -22,7 +23,7 @@ describe Projects::Security::WafAnomaliesController do
sign_in(user)
allow_next_instance_of(::Security::WafAnomalySummaryService) do |instance|
allow(instance).to receive(:elasticsearch_client).at_most(:twice) { es_client }
allow(instance).to receive(:elasticsearch_client).at_most(3).times { es_client }
end
end
......
......@@ -10,11 +10,27 @@ describe Security::WafAnomalySummaryService do
let(:es_client) { double(Elasticsearch::Client) }
let(:nginx_response) do
empty_response.deep_merge(
"hits" => { "total" => 3 },
"aggregations" => {
"counts" => {
"buckets" => [
{ "key_as_string" => "2020-02-14T23:00:00.000Z", "key" => 1575500400000, "doc_count" => 1 },
{ "key_as_string" => "2020-02-15T00:00:00.000Z", "key" => 1575504000000, "doc_count" => 0 },
{ "key_as_string" => "2020-02-15T01:00:00.000Z", "key" => 1575507600000, "doc_count" => 0 },
{ "key_as_string" => "2020-02-15T08:00:00.000Z", "key" => 1575532800000, "doc_count" => 2 }
]
}
}
)
end
let(:empty_response) do
{
"took" => 40,
"timed_out" => false,
"_shards" => { "total" => 1, "successful" => 1, "skipped" => 0, "failed" => 0 },
"_shards" => { "total" => 11, "successful" => 11, "skipped" => 0, "failed" => 0 },
"hits" => { "total" => 0, "max_score" => 0.0, "hits" => [] },
"aggregations" => {
"counts" => {
......@@ -58,6 +74,87 @@ describe Security::WafAnomalySummaryService do
expect(results.fetch(:anomalous_traffic)).to eq 0.0
end
end
context 'no violations' do
let(:nginx_results) { nginx_response }
let(:modsec_results) { empty_response }
it 'returns results' do
results = subject.execute
expect(results.fetch(:status)).to eq :success
expect(results.fetch(:interval)).to eq 'day'
expect(results.fetch(:total_traffic)).to eq 3
expect(results.fetch(:anomalous_traffic)).to eq 0.0
end
end
end
context 'with time window' do
it 'passes time frame to ElasticSearch' do
from = 1.day.ago
to = Time.now
subject = described_class.new(
environment: environment,
from: from,
to: to
)
allow(subject).to receive(:elasticsearch_client) { es_client }
expect(es_client).to receive(:msearch).with(
body: array_including(
hash_including(
query: hash_including(
bool: hash_including(
must: array_including(
hash_including(
range: hash_including(
'@timestamp' => {
gte: from,
lte: to
}
)
)
)
)
)
)
)
).and_return({ 'responses' => [{}] })
subject.execute
end
end
context 'with interval' do
it 'passes interval to ElasticSearch' do
interval = 'hour'
subject = described_class.new(
environment: environment,
interval: interval
)
allow(subject).to receive(:elasticsearch_client) { es_client }
expect(es_client).to receive(:msearch).with(
body: array_including(
hash_including(
aggs: hash_including(
counts: hash_including(
date_histogram: hash_including(
interval: interval
)
)
)
)
)
).and_return({ 'responses' => [{}] })
subject.execute
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment