Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
F
fluentbit-plugin-wendelin
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
nexedi
fluentbit-plugin-wendelin
Commits
284e0d83
Commit
284e0d83
authored
Oct 16, 2023
by
Jérome Perrin
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
go fmt
parent
f790463a
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
63 additions
and
63 deletions
+63
-63
src/fluentbit_wendelin.go
src/fluentbit_wendelin.go
+63
-63
No files found.
src/fluentbit_wendelin.go
View file @
284e0d83
package
main
package
main
import
(
import
(
"bytes"
"C"
"C"
"bytes"
"crypto/tls"
"fmt"
"fmt"
"os"
"unsafe"
"net/http"
"crypto/tls"
"strings"
"github.com/fluent/fluent-bit-go/output"
"github.com/fluent/fluent-bit-go/output"
"net/http"
"os"
"strings"
"unsafe"
)
)
//export FLBPluginRegister
//export FLBPluginRegister
...
@@ -22,22 +22,22 @@ func FLBPluginRegister(def unsafe.Pointer) int {
...
@@ -22,22 +22,22 @@ func FLBPluginRegister(def unsafe.Pointer) int {
// plugin (context) pointer to fluentbit context (state/ c code)
// plugin (context) pointer to fluentbit context (state/ c code)
func
FLBPluginInit
(
plugin
unsafe
.
Pointer
)
int
{
func
FLBPluginInit
(
plugin
unsafe
.
Pointer
)
int
{
streamtool_uri
:=
output
.
FLBPluginConfigKey
(
plugin
,
"streamtool_uri"
)
streamtool_uri
:=
output
.
FLBPluginConfigKey
(
plugin
,
"streamtool_uri"
)
user
:=
output
.
FLBPluginConfigKey
(
plugin
,
"user"
)
user
:=
output
.
FLBPluginConfigKey
(
plugin
,
"user"
)
password
:=
output
.
FLBPluginConfigKey
(
plugin
,
"password"
)
password
:=
output
.
FLBPluginConfigKey
(
plugin
,
"password"
)
buffer_type
:=
output
.
FLBPluginConfigKey
(
plugin
,
"buffer_type"
)
buffer_type
:=
output
.
FLBPluginConfigKey
(
plugin
,
"buffer_type"
)
flush_interval
:=
output
.
FLBPluginConfigKey
(
plugin
,
"flush_interval"
)
flush_interval
:=
output
.
FLBPluginConfigKey
(
plugin
,
"flush_interval"
)
disable_retry_limit
:=
output
.
FLBPluginConfigKey
(
plugin
,
"disable_retry_limit"
)
disable_retry_limit
:=
output
.
FLBPluginConfigKey
(
plugin
,
"disable_retry_limit"
)
reference
:=
output
.
FLBPluginConfigKey
(
plugin
,
"reference"
)
reference
:=
output
.
FLBPluginConfigKey
(
plugin
,
"reference"
)
dict
:=
map
[
string
]
string
{
dict
:=
map
[
string
]
string
{
"streamtool_uri"
:
streamtool_uri
,
"streamtool_uri"
:
streamtool_uri
,
"user"
:
user
,
"user"
:
user
,
"password"
:
password
,
"password"
:
password
,
"buffer_type"
:
buffer_type
,
"buffer_type"
:
buffer_type
,
"flush_interval"
:
flush_interval
,
"flush_interval"
:
flush_interval
,
"disable_retry_limit"
:
disable_retry_limit
,
"disable_retry_limit"
:
disable_retry_limit
,
"reference"
:
reference
,
"reference"
:
reference
,
}
}
output
.
FLBPluginSetContext
(
plugin
,
dict
)
output
.
FLBPluginSetContext
(
plugin
,
dict
)
return
output
.
FLB_OK
return
output
.
FLB_OK
}
}
...
@@ -47,11 +47,11 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int
...
@@ -47,11 +47,11 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int
var
record
map
[
interface
{}]
interface
{}
var
record
map
[
interface
{}]
interface
{}
// Create Fluent Bit decoder
// Create Fluent Bit decoder
dec
:=
output
.
NewDecoder
(
data
,
int
(
length
))
dec
:=
output
.
NewDecoder
(
data
,
int
(
length
))
dict
:=
output
.
FLBPluginGetContext
(
ctx
)
.
(
map
[
string
]
string
)
dict
:=
output
.
FLBPluginGetContext
(
ctx
)
.
(
map
[
string
]
string
)
// Iterate Records
// Iterate Records
var
result
string
var
result
string
result
=
""
result
=
""
var
is_end
bool
=
false
var
is_end
bool
=
false
for
{
for
{
// Extract Record
// Extract Record
ret
,
_
,
record
=
output
.
GetRecord
(
dec
)
ret
,
_
,
record
=
output
.
GetRecord
(
dec
)
...
@@ -61,16 +61,16 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int
...
@@ -61,16 +61,16 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int
// Print record keys and values
// Print record keys and values
for
_
,
v
:=
range
record
{
for
_
,
v
:=
range
record
{
var
output_string
string
=
""
var
output_string
string
=
""
for
_
,
s
:=
range
v
.
([]
uint8
)
{
for
_
,
s
:=
range
v
.
([]
uint8
)
{
output_string
=
output_string
+
string
(
s
)
output_string
=
output_string
+
string
(
s
)
}
}
if
strings
.
Contains
(
output_string
,
"fluentbit_end"
)
{
if
strings
.
Contains
(
output_string
,
"fluentbit_end"
)
{
is_end
=
true
is_end
=
true
}
}
result
=
result
+
output_string
result
=
result
+
output_string
}
}
result
+=
"
\n
"
result
+=
"
\n
"
}
}
// Return options:
// Return options:
...
@@ -78,35 +78,35 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int
...
@@ -78,35 +78,35 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int
// output.FLB_OK = data have been processed.
// output.FLB_OK = data have been processed.
// output.FLB_ERROR = unrecoverable error, do not try this again.
// output.FLB_ERROR = unrecoverable error, do not try this again.
// output.FLB_RETRY = retry to flush later.
// output.FLB_RETRY = retry to flush later.
//body result
//body result
// content type "application/octet-stream"
// content type "application/octet-stream"
var
b
=
[]
byte
(
result
)
var
b
=
[]
byte
(
result
)
uri
:=
fmt
.
Sprintf
(
"%s/ingest?reference=%s"
,
dict
[
"streamtool_uri"
],
dict
[
"reference"
])
uri
:=
fmt
.
Sprintf
(
"%s/ingest?reference=%s"
,
dict
[
"streamtool_uri"
],
dict
[
"reference"
])
client
:=
&
http
.
Client
{
client
:=
&
http
.
Client
{
Transport
:
&
http
.
Transport
{
Transport
:
&
http
.
Transport
{
TLSClientConfig
:
&
tls
.
Config
{
InsecureSkipVerify
:
true
},
TLSClientConfig
:
&
tls
.
Config
{
InsecureSkipVerify
:
true
},
},
},
}
}
req
,
err
:=
http
.
NewRequest
(
"POST"
,
uri
,
bytes
.
NewReader
(
b
))
req
,
err
:=
http
.
NewRequest
(
"POST"
,
uri
,
bytes
.
NewReader
(
b
))
if
err
!=
nil
{
if
err
!=
nil
{
fmt
.
Fprintf
(
os
.
Stderr
,
"Got error %s"
,
err
.
Error
())
fmt
.
Fprintf
(
os
.
Stderr
,
"Got error %s"
,
err
.
Error
())
return
output
.
FLB_RETRY
return
output
.
FLB_RETRY
}
}
req
.
SetBasicAuth
(
dict
[
"user"
],
dict
[
"password"
])
req
.
SetBasicAuth
(
dict
[
"user"
],
dict
[
"password"
])
req
.
Header
.
Set
(
"Content-Type"
,
"application/octet-stream"
)
req
.
Header
.
Set
(
"Content-Type"
,
"application/octet-stream"
)
rsp
,
err
:=
client
.
Do
(
req
)
rsp
,
err
:=
client
.
Do
(
req
)
if
err
!=
nil
{
if
err
!=
nil
{
fmt
.
Fprintf
(
os
.
Stderr
,
"got error %s"
,
err
.
Error
())
fmt
.
Fprintf
(
os
.
Stderr
,
"got error %s"
,
err
.
Error
())
return
output
.
FLB_RETRY
return
output
.
FLB_RETRY
}
}
if
rsp
.
StatusCode
!=
204
{
if
rsp
.
StatusCode
!=
204
{
fmt
.
Fprintf
(
os
.
Stderr
,
"status code %d"
,
rsp
.
StatusCode
)
fmt
.
Fprintf
(
os
.
Stderr
,
"status code %d"
,
rsp
.
StatusCode
)
return
output
.
FLB_RETRY
return
output
.
FLB_RETRY
}
}
if
is_end
{
if
is_end
{
os
.
Exit
(
0
)
os
.
Exit
(
0
)
}
}
return
output
.
FLB_OK
return
output
.
FLB_OK
}
}
//export FLBPluginExit
//export FLBPluginExit
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment