Commit 894efb12 authored by Ophélie Gagnard's avatar Ophélie Gagnard

conert script: support sha256 instead of md5sum

parent 6ded4685
import numpy as np
import re
import json
import os.path
"""
GLOSSARY
Words defined here are marked with a " * ".
scan: a complete scan of a filesystem
"""
def get_end_and_json_list(start, in_data_stream, chunk_size = 16 * 1024 * 1024):
"""
Return:
- the index of the end of the scan*: @end
- a list of strings assumed to be valid json string: json_string_list
(this assumes the scanning of the file system produces correct strings)
- a boolean telling whether the chunk reaches the end of the current scan*:
@is_end_of_scan.
"""
# assume max path size is 4096 and add 4096 for the rest
max_remaining_for_eol = 8192
end = min(start + chunk_size + max_remaining_for_eol, in_data_stream.getSize())
unpacked = in_data_stream.readChunkList(start, end)
unpacked_string = "".join(unpacked)
# extending the current chunk until the next end of line,
# so json remains valid
if end < in_data_stream.getSize():
new_end_index = chunk_size
while unpacked_string[new_end_index] != '\n':
new_end_index += 1
end = start + new_end_index + 1
raw_data_string = unpacked_string[:end]
end_scan_regexp = re.compile('.*?\[fluentbit_end\]\n', re.DOTALL)
scan_end = end_scan_regexp.match(raw_data_string)
if not scan_end:
is_end_of_scan = False
else:
is_end_of_scan = True
end = start + len(scan_end.group()) + 1
raw_data_string = raw_data_string[:len(scan_end.group())]
line_list = raw_data_string.splitlines()
timestamp_json_regexp = re.compile(r'.*?:(.*?)\[(.*)\]')
json_string_list = [timestamp_json_regexp.match(line).group(2)
for line in line_list
if (timestamp_json_regexp.match(line) and (len(timestamp_json_regexp.match(line).groups()) == 2))]
return end, json_string_list, is_end_of_scan
def get_attribute_tuple_list(json_string_list, is_end_of_scan, is_node):
"""
Parse unpacked and return a list: (path, slice1, slice2, slice3, slice 4).
path is the path of the processed file, sliceN are the parts of the digest
of the processed file. They are stored in "big endian" format, i.e. slice1
is the "bigger part".
Note: At the moment, parsing of sha256 is hardcoded.
NOTE: timestamps are parsed in case they are needed for future improvement
but they are not used at the moment.
"""
if is_end_of_scan:
# this lign deletes the "fluentbit_end" at the end of a scan
# because it is not valid json
json_string_list = json_string_list[:-1]
tmp_data_list = []
fail_counter = 0
for json_string in json_string_list:
try:
tmp_data_list.append(json.loads(json_string))
except ValueError:
fail_counter += 1
context.log("Error number: " + str(fail_counter))
context.log(json_string)
context.log("\n****************\n")
data_list = []
for data in tmp_data_list:
in_list = False
if ('path' in data) and exclude_file_list:
# each file coming from the target root filesystem of a node begins with
# "/sysroot", the next 2 lines remove it
if not is_node:
data['path'] = data['path'][8:]
for exclude_file in exclude_file_list:
if os.path.commonprefix([data['path'], exclude_file]) == exclude_file:
in_list = True
break
if not in_list:
data_list.append(data)
return [(data['path'],
int(data['hash']['sha256'][0:8], 16),
int(data['hash']['sha256'][8:16], 16),
int(data['hash']['sha256'][16:24], 16),
int(data['hash']['sha256'][24:32], 16),
)
for data in data_list
if 'path' in data and 'hash' in data and 'sha256' in data['hash']]
def get_uid_list(attribute_tuple_list, data_stream):
"""
Fill the mappings and get the list of UIDs.
The argument @data_stream is only used to access the mappings.
"""
uid_list = []
for triplet in attribute_tuple_list:
data_stream.add_path(triplet, 'test')
triplet_uid = data_stream.get_uid_from_path(triplet, 'test')
uid_list += [triplet_uid]
return uid_list
def create_ndarray(uid_list):
"""
Takes a UIDs list and returns a UIDs ndarray.
This function exists so that the stages of the data processing are clear and
so that if further transformations on the data are needed, one can simply
add them here without reorganizing the code.
"""
uid_list.append(-1) # used as a delimiter between the scans
return np.ndarray((len(uid_list),), 'int64', np.array(uid_list))
progress_indicator = in_stream["Progress Indicator"]
try:
in_data_stream = in_stream["Data Stream"]
except KeyError:
return
#if in_data_stream.getReference() != "douai005-capri009-douai005-capri009":
# context.log("reference = ", in_data_stream.getReference())
# context.log("returning")
# return
#context.log("reference = ", in_data_stream.getReference())
#context.log("not returning")
out_data_array = out_array["Data Array"]
exclude_file_list = []
out_data_array.setPublicationSectionList(in_data_stream.getPublicationSectionList())
if 'file_system_image/reference_image' in in_data_stream.getPublicationSectionList():
is_node = False
if out_data_array.getValidationState() == 'draft':
out_data_array.validate()
else:
is_node = True
if not out_data_array.getCausality():
ingestion_line = in_data_stream.getAggregateRelatedValue(portal_type='Data Ingestion Line')
resource = ingestion_line.getResource()
exclude_file_list = ingestion_line.getResourceValue().DataProduct_getExcludeFileList()
out_data_array.edit(causality=resource)
start = progress_indicator.getIntOffsetIndex()
end = in_data_stream.getSize()
if start >= end:
return
end, json_string_list, is_end_of_scan = get_end_and_json_list(start, in_data_stream)
attribute_tuple_list = get_attribute_tuple_list(json_string_list, is_end_of_scan, is_node)
context.log("reference = ", in_data_stream.getReference())
context.log("length of attribute tuple list = ", len(attribute_tuple_list))
uid_list = get_uid_list(attribute_tuple_list, in_data_stream)
uid_ndarray = create_ndarray(uid_list)
context.log("start =", start)
context.log("end =", end)
if start == 0:
zbigarray = None
else:
zbigarray = out_data_array.getArray()
if zbigarray is None:
zbigarray = out_data_array.initArray(shape=(0,), dtype='int64')
if len(uid_ndarray) > 0:
zbigarray.append(uid_ndarray)
if end > start:
progress_indicator.setIntOffsetIndex(end)
# tell caller to create new activity after processing
# if we did not reach end of stream
if end < in_data_stream.getSize():
return 1
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment