Commit 79809330 authored by Sasha Goldshtein's avatar Sasha Goldshtein

bcc: Add test for tracepoint support

The test asserts that we can enable the sched_switch tracepoint and read
some events from it. The test is also marked to require kernel 4.7 or
later, because that's where the BPF support for tracepoints was introduced.
parent 1198c3c6
......@@ -50,6 +50,8 @@ add_test(NAME py_uprobes WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
COMMAND ${TEST_WRAPPER} py_uprobes sudo ${CMAKE_CURRENT_SOURCE_DIR}/test_uprobes.py)
add_test(NAME py_test_stackid WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
COMMAND ${TEST_WRAPPER} py_stackid sudo ${CMAKE_CURRENT_SOURCE_DIR}/test_stackid.py)
add_test(NAME py_test_tracepoint WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
COMMAND ${TEST_WRAPPER} py_test_tracepoint sudo ${CMAKE_CURRENT_SOURCE_DIR}/test_tracepoint.py)
add_test(NAME py_test_dump_func WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
COMMAND ${TEST_WRAPPER} py_dump_func simple ${CMAKE_CURRENT_SOURCE_DIR}/test_dump_func.py)
#!/usr/bin/env python
# Copyright (c) Sasha Goldshtein
# Licensed under the Apache License, Version 2.0 (the "License")
import bcc
import unittest
from time import sleep
import distutils.version
import os
def kernel_version_ge(major, minor):
# True if running kernel is >= X.Y
version = distutils.version.LooseVersion(os.uname()[2]).version
if version[0] > major:
return True
if version[0] < major:
return False
if minor and version[1] < minor:
return False
return True
@unittest.skipUnless(kernel_version_ge(4,7), "requires kernel >= 4.7")
class TestTracepoint(unittest.TestCase):
def test_tracepoint(self):
text = """#include <linux/ptrace.h>
struct tp_args {
unsigned long long __unused__;
char prev_comm[16];
pid_t prev_pid;
int prev_prio;
long prev_state;
char next_comm[16];
pid_t next_pid;
int next_prio;
};
BPF_HASH(switches, u32, u64);
int probe_switch(struct tp_args *args) {
if (args == 0)
return 0;
u64 val = 0;
u32 pid = args->next_pid;
u64 *existing = switches.lookup_or_init(&pid, &val);
(*existing)++;
return 0;
}
"""
b = bcc.BPF(text=text)
b.attach_tracepoint("sched:sched_switch", "probe_switch")
sleep(1)
total_switches = 0
for k, v in b["switches"].items():
total_switches += v.value
self.assertNotEqual(0, total_switches)
b.detach_tracepoint("sched:sched_switch")
if __name__ == "__main__":
unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment