Commit 7d4e5b72 authored by Ekaterina's avatar Ekaterina

change: plugin files [dev]

parent 4a4a6795
...@@ -19,14 +19,14 @@ ...@@ -19,14 +19,14 @@
# This plugin hooks into Fluentd in a way similiar to out_forward and # This plugin hooks into Fluentd in a way similiar to out_forward and
# out_secure_forward and outputs event stream to a Wendelin HTTP endpoint. # out_secure_forward and outputs event stream to a Wendelin HTTP endpoint.
require 'fluent/output' require 'fluent/plugin/output'
require_relative 'wendelin_client' require_relative 'wendelin_client'
module Fluent module Fluent::Plugin
class WendelinOutput < ObjectBufferedOutput # XXX verify base class class WendelinOutput < Output
Plugin.register_output('wendelin', self) Fluent::Plugin.register_output('wendelin', self)
# where Wendelin's Input Stream Tool is located, # where Wendelin's Input Stream Tool is located,
# ex http://example.com/erp5/portal_ingestion_policies/example_ingestion # ex http://example.com/erp5/portal_ingestion_policies/example_ingestion
...@@ -38,16 +38,32 @@ module Fluent ...@@ -38,16 +38,32 @@ module Fluent
config_param :user, :string, :default => nil config_param :user, :string, :default => nil
config_param :password, :string, :default => nil config_param :password, :string, :default => nil
config_param :use_keep_alive, :bool, :default => false
config_param :ssl_timeout, :integer, :default => 60
config_param :open_timeout, :integer, :default => 60
config_param :read_timeout, :integer, :default => 60
config_param :keep_alive_timeout, :integer, :default => 300
config_section :buffer do
config_set_default :chunk_keys, ["tag"]
end
def configure(conf) def configure(conf)
super super
unless @chunk_key_tag
raise Fluent::ConfigError, "buffer chunk key must include 'tag' for wendelin output"
end
credentials = {} credentials = {}
if @user if @user
credentials['user'] = @user credentials['user'] = @user
credentials['password'] = @password credentials['password'] = @password
end end
@wendelin = WendelinClient.new(@streamtool_uri, credentials, @log) @wendelin = WendelinClient.new(@streamtool_uri, credentials, @log,
@ssl_timeout, @open_timeout,
@read_timeout, @keep_alive_timeout)
end end
def start def start
...@@ -60,12 +76,11 @@ module Fluent ...@@ -60,12 +76,11 @@ module Fluent
# TODO # TODO
end end
# Use normal "Synchronous Buffer" - write out records from a buffer chunk for a tag.
# hooked to ObjectBufferedOutput - write out records from a buffer chunk for a tag.
# #
# NOTE this is called from a separate thread (see OutputThread and BufferedOutput) def write(chunk)
def write_objects(tag, chunk) return if chunk.empty?
# NOTE if this fail and raises -> it will unroll to BufferedOutput#try_flush # NOTE if this fail and raises -> it will unroll to Output#try_flush
# which detects errors and retries outputting logs up to retry maxcount # which detects errors and retries outputting logs up to retry maxcount
# times and aborts outputting current logs if all try fail. # times and aborts outputting current logs if all try fail.
# #
...@@ -78,11 +93,16 @@ module Fluent ...@@ -78,11 +93,16 @@ module Fluent
# for input_stream_ref use tag as-is - it will be processed/translated # for input_stream_ref use tag as-is - it will be processed/translated
# further on server by Wendelin # further on server by Wendelin
reference = tag reference = chunk.metadata.tag
@wendelin.ingest(reference, data_chunk) if @use_keep_alive
@wendelin.ingest_with_keep_alive(reference, data_chunk)
else
@wendelin.ingest(reference, data_chunk)
end
end end
end end
end end
...@@ -19,87 +19,171 @@ ...@@ -19,87 +19,171 @@
require 'net/http' require 'net/http'
require 'openssl' require 'openssl'
# class representing a Wendelin client # class representing a Wendelin client
class WendelinClient class WendelinClient
# `streamtool_uri` - URI pointing to portal_input_data_stream "mountpoint" # `streamtool_uri` - URI pointing to portal_input_data_stream "mountpoint"
# `credentials` # {'user' => _, 'password' => _} TODO change to certificate # `credentials` # {'user' => _, 'password' => _} TODO change to certificate
# `log` - logger to use # `log` - logger to use
def initialize(streamtool_uri, credentials, log) def initialize(streamtool_uri, credentials, log,
@streamtool_uri = streamtool_uri ssl_timeout, open_timeout, read_timeout, keep_alive_timeout)
@credentials = credentials @streamtool_uri = streamtool_uri
@log = log @credentials = credentials
@log = log
@ssl_timeout = ssl_timeout
@open_timeout = open_timeout
@read_timeout = read_timeout
@keep_alive_timeout = keep_alive_timeout
end end
# start request in an independent function to keep the connection open
def start_connection(uri)
@log.debug "start new connection"
@http = Net::HTTP.start(uri.hostname, uri.port,
:use_ssl => (uri.scheme == 'https'),
:verify_mode => OpenSSL::SSL::VERIFY_NONE,
# Net::HTTP default open timeout is infinity, which results
# in thread hang forever if other side does not fully
# establish connection. Default read_timeout is 60 seconds.
# We go safe way and make sure all timeouts are defined.
:ssl_timeout => @ssl_timeout,
:open_timeout => @open_timeout,
:read_timeout => @read_timeout,
:keep_alive_timeout => @keep_alive_timeout,)
end
# ingest `data_chunk` to a stream referenced as `reference` # ingest `data_chunk` to a stream referenced as `reference`
def ingest(reference, data_chunk) def ingest_with_keep_alive(reference, data_chunk)
uri = URI("#{@streamtool_uri}/ingest?reference=#{reference}") uri = URI("#{@streamtool_uri}/ingest?reference=#{reference}")
req = Net::HTTP::Post.new(uri) # call start_connection if http is undefined
if @credentials.has_key?('user') if ! defined? @http
req.basic_auth @credentials['user'], @credentials['password'] start_connection(uri)
end end
# TODO ensure content-type is 'raw', e.g. this way # connect again if the connection is not started
# (but then querystring ?reference=... is lost) if ! @http.started?()
# req.body = data_chunk start_connection(uri)
# req.content_type = 'application/octet-stream' end
req.set_form_data('data_chunk' => data_chunk)
@request = Net::HTTP::Post.new(uri)
@log.on_trace do
@log.trace '>>> REQUEST' # When using 'application/x-www-form-urlencoded', Ruby encodes with regex
@log.trace "method\t=> #{req.method}" # and it is far too slow. Such POST is legit:
@log.trace "path\t=> #{req.path}" # https://stackoverflow.com/a/14710450
@log.trace "uri\t=> #{req.uri}" @request.body = data_chunk
@log.trace "body\t=> #{req.body}" @request.content_type = 'application/octet-stream'
@log.trace "body_stream\t=> #{req.body_stream}"
req.each {|h| @log.trace "#{h}:\t#{req[h]}"} if @credentials.has_key?('user')
@log.trace @request.basic_auth @credentials['user'], @credentials['password']
end end
begin @log.trace do
# TODO keep connection open (so that every new ingest does not do @log.trace '>>> REQUEST'
# full connect again) @log.trace "method\t=> #{@request.method}"
res = Net::HTTP.start(uri.hostname, uri.port, @log.trace "path\t=> #{@request.path}"
:use_ssl => (uri.scheme == 'https'), @log.trace "uri\t=> #{@request.uri}"
# NOTE = "do not check server cert" @log.trace "body\t=> #{@request.body}"
# TODO move this out to conf parameters @log.trace "body_stream\t=> #{@request.body_stream}"
:verify_mode => OpenSSL::SSL::VERIFY_NONE, @request.each {|h| @log.trace "#{h}:\t#{@request[h]}"}
@log.trace
# Net::HTTP default open timeout is infinity, which results end
# in thread hang forever if other side does not fully
# establish connection. Default read_timeout is 60 seconds. begin
# We go safe way and make sure all timeouts are defined. res = @http.request(@request) # Net::HTTPResponse object
:ssl_timeout => 60, end
:open_timeout => 60,
:read_timeout => 60, rescue
) do |http| # some http/ssl/other connection error
http.request(req) @log.warn "HTTP ERROR:"
end raise
else
rescue @log.trace do
# some http/ssl/other connection error @log.trace '>>> RESPONSE'
@log.warn "HTTP ERROR:" res.each {|h| @log.trace "#{h}:\t#{res[h]}"}
raise @log.trace "code\t=> #{res.code}"
@log.trace "msg\t=> #{res.message}"
else @log.trace "class\t=> #{res.class}"
@log.on_trace do @log.trace "body:", res.body
@log.trace '>>> RESPONSE' end
res.each {|h| @log.trace "#{h}:\t#{res[h]}"}
@log.trace "code\t=> #{res.code}" if res.kind_of?(Net::HTTPSuccess) # res.code is 2XX
@log.trace "msg\t=> #{res.message}" #@log.info "ingested ok"
@log.trace "class\t=> #{res.class}" else
@log.trace "body:", res.body @log.warn "FAIL:"
end res.value
end
if res.kind_of?(Net::HTTPSuccess) # res.code is 2XX
#@log.info "ingested ok"
else
@log.warn "FAIL:"
res.value
end
end
end end
def ingest(reference, data_chunk)
uri = URI("#{@streamtool_uri}/ingest?reference=#{reference}")
req = Net::HTTP::Post.new(uri)
if @credentials.has_key?('user')
req.basic_auth @credentials['user'], @credentials['password']
end
# When using 'application/x-www-form-urlencoded', Ruby encodes with regex
# and it is far too slow. Such POST is legit:
# https://stackoverflow.com/a/14710450
req.body = data_chunk
req.content_type = 'application/octet-stream'
@log.trace do
@log.trace '>>> REQUEST'
@log.trace "method\t=> #{req.method}"
@log.trace "path\t=> #{req.path}"
@log.trace "uri\t=> #{req.uri}"
@log.trace "body\t=> #{req.body}"
@log.trace "body_stream\t=> #{req.body_stream}"
req.each {|h| @log.trace "#{h}:\t#{req[h]}"}
@log.trace
end
begin
# TODO keep connection open (so that every new ingest does not do
# full connect again)
res = Net::HTTP.start(uri.hostname, uri.port,
:use_ssl => (uri.scheme == 'https'),
# NOTE = "do not check server cert"
# TODO move this out to conf parameters
:verify_mode => OpenSSL::SSL::VERIFY_NONE,
# Net::HTTP default open timeout is infinity, which results
# in thread hang forever if other side does not fully
# establish connection. Default read_timeout is 60 seconds.
# We go safe way and make sure all timeouts are defined.
:ssl_timeout => @ssl_timeout,
:open_timeout => @open_timeout,
:read_timeout => @read_timeout,
) do |http|
http.request(req)
end
rescue
# some http/ssl/other connection error
@log.warn "HTTP ERROR:"
raise
else
@log.trace do
@log.trace '>>> RESPONSE'
res.each {|h| @log.trace "#{h}:\t#{res[h]}"}
@log.trace "code\t=> #{res.code}"
@log.trace "msg\t=> #{res.message}"
@log.trace "class\t=> #{res.class}"
@log.trace "body:", res.body
end
if res.kind_of?(Net::HTTPSuccess) # res.code is 2XX
#@log.info "ingested ok"
else
@log.warn "FAIL:"
res.value
end
end
end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment