ebulk: new features and fixes

- minor fixes in storage inputs
- retry on failed connections
- resume split operation for download
- control on first ingestion / partial ingestion
- report messages when storage ingestion finishes
- better control on exception and messages
parent 1e8783af
......@@ -151,13 +151,19 @@ function updateConfigFile {
echo
OPTION=""
if [ "$ADVANCED" = true ] ; then
echo "[INFO] If you want to edit the configuration file by yourself (advanced) please type CONFIG, otherwise press enter to continue."
echo "[INFO] If you want to edit the configuration file by yourself (advanced) please type YES, otherwise press enter to continue."
echo -n "** Open configuration file? ('YES' to confirm): "
read -e OPTION
if [ "$OPTION" = "CONFIG" ] ; then
if [ "$OPTION" = "YES" ] ; then
CUSTOM=true
fi
fi
if [ "$OPTION" != "CONFIG" ] ; then
if [ "$OPTION" != "YES" ] ; then
if [ "$ADVANCED" = true ] ; then
echo
echo "** Please enter the information needed to configure the storage **"
echo
fi
$PARAMETER_FUNCTION
if [ "$ADVANCED" = false ] ; then
echo "** If you need a more advanced storage configuration, you can run the tool with the parameter --advanced **"
......@@ -231,7 +237,7 @@ function runProcess {
if [ ! -d $LOG_DIR ]; then
mkdir $LOG_DIR 2>/dev/null
fi
if [ -z "$RESUME_STORAGE_INGESTION" ]; then
if [ "$RESTART_STORAGE_INGESTION" ]; then
rm -f ${DIFF_FILE} 2>/dev/null
fi
$embulk run -I $TOOL_PATH/embulk-wendelin-dataset-tool/lib $FILE $DIFF_COMMAND 2> "$LOG_DIR/error.log" || {
......@@ -322,9 +328,7 @@ function checkStoragePlugin {
if [ "$STORAGE_GEM" != "" ] ; then
echo -n "[INFO] Checking if '$STORAGE' plugin is installed... "
if [ "$CUSTOM" = false ] ; then
sleep 1
echo -e "${GREEN}OK${NC}"
sleep 1
else
$embulk gem list 2>/dev/null | grep -q "$STORAGE_GEM" 2>/dev/null
if [ $? == 0 ]; then
......@@ -386,12 +390,15 @@ function askFTPparameters {
echo -e "${ORANGE}[ERROR] Empty host.${NC}"
exit
fi
FTP_HOST="${FTP_HOST/ftp:\/\//$empty}"
if [[ $FTP_HOST == *"/"* ]]; then
echo -e "${ORANGE}[ERROR] Please, enter only the ftp host, without '/' or path. Path will be requested after.${NC}"
exit
fi
echo "Path prefix:"
echo "* (e.g. /mydata/sample/dir/) *"
read -e FTP_PATH
if [ "$FTP_PATH" = "" ] ; then
FTP_PATH="/"
fi
FTP_PATH="/$FTP_PATH"
echo "FTP User:"
echo "* you can leave this input empty and anonymous authentication will be used *"
read -e FTP_USER
......@@ -493,7 +500,7 @@ while [ "$1" != "" ]; do
;;
-a | --advanced ) ADVANCED=true
;;
-rs | --resume ) RESUME_STORAGE_INGESTION=true
-rs | --restart ) RESTART_STORAGE_INGESTION=true
;;
-dc | --discard-changes ) DISCARD_CHANGES=true
;;
......
......@@ -9,9 +9,12 @@ class DatasetUtils
DATASET_TEMP_REPORT_FILE = ".temp-dataset-task-report"
DATASET_COMPLETED_FILE = ".dataset-completed"
RESUME_OPERATION_FILE = ".resume-operation"
INITIAL_INGESTION_FILE = ".initial-ingestion"
PARTIAL_INGESTION_FILE = ".partial-ingestion"
STAGED_FILE = ".staged"
DISCARD_CHANGES_FILE = ".discard-changes"
SPLIT_FILE = ".split-operation"
SPLIT_CONTROL_FILE = ".control-split-operation"
FIRST_INGESTION_FILE = ".first-ingestion"
RUN_DONE = "done"
RUN_ERROR = "error"
......@@ -19,6 +22,7 @@ class DatasetUtils
DELETE = "DELETE"
RENAME = "RENAME"
INGESTION = "ingestion"
DOWNLOAD = "download"
ADD = "add"
REMOVE = "remove"
STATUS_NEW = "new"
......@@ -45,15 +49,18 @@ class DatasetUtils
DATE_FORMAT = "%Y-%m-%d-%H-%M-%S"
def initialize(data_set_directory)
@data_set_directory = data_set_directory
@data_set_directory = appendSlashTo(data_set_directory)
@logger = LogManager.instance()
@task_report_file = @data_set_directory + DATASET_REPORT_FILE
@temp_report_file = @data_set_directory + DATASET_TEMP_REPORT_FILE
@completed_file = @data_set_directory + DATASET_COMPLETED_FILE
@resume_operation_file = @data_set_directory + RESUME_OPERATION_FILE
@initial_ingestion_file = @data_set_directory + INITIAL_INGESTION_FILE
@partial_ingestion_file = @data_set_directory + PARTIAL_INGESTION_FILE
@first_ingestion_file = @data_set_directory + FIRST_INGESTION_FILE
@staged_file = @data_set_directory + STAGED_FILE
@discard_changes_file = @data_set_directory + DISCARD_CHANGES_FILE
@split_file = @data_set_directory + SPLIT_FILE
@split_control_file = @data_set_directory + SPLIT_CONTROL_FILE
end
def getLocalPaths(paths)
......@@ -207,7 +214,7 @@ class DatasetUtils
end
end
def showTaskErrors(failed_tasks)
def showTaskErrors(failed_tasks, storage=FALSE)
puts
@logger.error("The following files could not be processed. Please check the details in the log file: " + @logger.getLogPath(), print=TRUE)
if failed_tasks.length > 15
......@@ -217,10 +224,77 @@ class DatasetUtils
else
@logger.error(failed_tasks, print=TRUE)
end
if storage
puts
@logger.info("You can retry the operation for those files by running your command again with parameter --restart .", print=TRUE)
else
@logger.info("You can retry the operation for those files.", print=TRUE)
end
puts
end
def splitOperationControlFileExist(reference)
file_reference = reference.gsub('/', '__')
return File.exist?(@split_control_file + "__" + file_reference)
end
def createSplitOperationControlFile(reference)
file_reference = reference.gsub('/', '__')
File.open(@split_control_file + "__" + file_reference, 'w') {}
end
def deleteSplitOperationControlFile(reference=nil)
if reference.nil?
Dir.glob("#{@split_control_file}*").each { |file| File.delete(file) }
else
file_reference = reference.gsub('/', '__')
File.delete(@split_control_file + "__" + file_reference) if File.exist?(@split_control_file + "__" + file_reference)
end
end
def deleteSplitOperationFile(reference=nil)
if reference.nil?
Dir.glob("#{@split_file}*").each { |file| File.delete(file) }
else
file_reference = reference.gsub('/', '__')
File.delete(@split_file + "__" + file_reference) if File.exist?(@split_file + "__" + file_reference)
end
end
def saveSplitOperation(operation, reference, eof, hash, chunk_size)
file_reference = reference.gsub('/', '__')
record = [operation, reference, eof, hash, Integer(chunk_size)].join(RECORD_SEPARATOR)
File.open(@split_file + "__" + file_reference, 'w') { |file| file.puts(record) }
deleteSplitOperationControlFile(reference)
end
def splitOperationFileExist(reference)
file_reference = reference.gsub('/', '__')
return File.exist?(@split_file + "__" + file_reference)
end
def getLastSplitOperation(operation, reference, hash, chunk_size)
file_reference = reference.gsub('/', '__')
return 0 if not File.exist?(@split_file + "__" + file_reference)
record = File.open(@split_file + "__" + file_reference).read.chomp.split(RECORD_SEPARATOR)
if record[0] == operation && record[1] == reference && record[3] == hash && record[4] == Integer(chunk_size).to_s && record[2] != EOF
# discard if user interrupted (ctrl+c) the operation
if splitOperationControlFileExist(file_reference)
@logger.warn("Previous split operation attempt for file #{reference} was interrupt by user (aborted tool execution), it will be restarted.", print=TRUE)
deleteSplitOperationFile(file_reference)
deleteSplitOperationControlFile(file_reference)
return 0
end
createSplitOperationControlFile(file_reference)
return record[2].to_i
end
return 0
rescue Exception => e
@logger.error("An error occurred in getLastSplitOperation method:" + e.to_s)
@logger.error(e.backtrace)
return 0
end
def deleteDiscardChangesFile()
File.delete(@discard_changes_file) if File.exist?(@discard_changes_file)
end
......@@ -255,16 +329,28 @@ class DatasetUtils
end
end
def deleteInitialIngestionFile()
File.delete(@initial_ingestion_file) if File.exist?(@initial_ingestion_file)
def deleteFirstIngestionFile()
File.delete(@first_ingestion_file) if File.exist?(@first_ingestion_file)
end
def createFirstIngestionFile()
File.open(@first_ingestion_file, 'w') {}
end
def firstIngestionFileExist()
return File.exist?(@first_ingestion_file)
end
def deletePartialIngestionFile()
File.delete(@partial_ingestion_file) if File.exist?(@partial_ingestion_file)
end
def createInitialIngestionFile()
File.open(@initial_ingestion_file, 'w') {}
def createPartialIngestionFile()
File.open(@partial_ingestion_file, 'w') {}
end
def initialIngestionFileExist()
return File.exist?(@initial_ingestion_file)
def partialIngestionFileExist()
return File.exist?(@partial_ingestion_file)
end
def stagedFileExist()
......
......@@ -22,7 +22,7 @@ module Embulk
]
def self.status(task, push=FALSE)
partial_ingestion = @dataset_utils.initialIngestionFileExist()
partial_ingestion = @dataset_utils.partialIngestionFileExist()
staged_changes, untracked_changes = @dataset_utils.getLocalChanges(task['paths'], task['data_set'], staged=TRUE, partial_ingestion=partial_ingestion)
staged = (not staged_changes.empty?)
task['paths'] = staged ? staged_changes : untracked_changes
......@@ -96,7 +96,7 @@ module Embulk
@status = @status == "" ? FALSE : @status
@dataset_utils.deleteDiscardChangesFile()
if @status
if not @dataset_utils.initialIngestionFileExist()
if not @dataset_utils.partialIngestionFileExist()
if not @dataset_utils.reportFileExist()
puts
@logger.error("The dataset directory does not contain a valid dataset.", print=TRUE)
......@@ -123,11 +123,12 @@ module Embulk
@logger.abortExecution()
end
task['data_streams'] = data_stream_dict["result"]
first_ingestion = task['data_streams'].length == 0
if not @dataset_utils.reportFileExist()
@dataset_utils.createInitialIngestionFile()
@dataset_utils.createPartialIngestionFile()
@dataset_utils.createFirstIngestionFile() if first_ingestion
else
if not @dataset_utils.initialIngestionFileExist()
if not @dataset_utils.partialIngestionFileExist()
@logger.info("Checking local dataset...", print=TRUE)
if not @dataset_utils.reportUpToDate(data_stream_dict, @data_set)
puts
......@@ -188,12 +189,17 @@ module Embulk
task_reports = yield(task, columns, count)
next_config_diff = task_reports.map{|hash| hash[DatasetUtils::RUN_DONE]}.flatten.compact
@dataset_utils.showTaskReport(next_config_diff)
element_output = @dataset_utils.initialIngestionFileExist() ? "new file" : "change"
element_output = @dataset_utils.partialIngestionFileExist() ? "new file" : "change"
@logger.info("#{next_config_diff.length} #{element_output}(s) ingested.", print=TRUE)
if(next_config_diff.length == count)
@logger.info("Dataset successfully ingested.", print=TRUE)
@wendelin.increaseDatasetVersion(@data_set)
@dataset_utils.deleteStagedFile()
if @dataset_utils.firstIngestionFileExist()
@dataset_utils.createCompletedFile()
@dataset_utils.deletePartialIngestionFile()
@dataset_utils.deleteFirstIngestionFile()
end
else
failed_tasks = task_reports.map{|hash| hash[DatasetUtils::RUN_ERROR] || hash[DatasetUtils::RUN_ABORTED] }.flatten.compact
@dataset_utils.showTaskErrors(failed_tasks)
......@@ -221,6 +227,7 @@ module Embulk
hash = file_dict["hash"]
delete = hash == DatasetUtils::DELETE
rename = file_dict["status"] == DatasetUtils::STATUS_RENAMED
split = FALSE
if size == "" and hash == "" #new file
size = File.size(path)
hash = @dataset_utils.getHash(path)
......@@ -229,10 +236,17 @@ module Embulk
filename, extension, reference = @dataset_utils.getPathInfo(path, @dataset)
operation = rename ? DatasetUtils::RENAME : DatasetUtils::INGESTION
@dataset_utils.saveCurrentOperation(operation, reference, new_reference)
each_chunk(path, filename, extension, size, hash, schema[1..-1].map{|elm| elm.name}, @chunk_size, delete, new_reference) do |entry|
resume_split = @dataset_utils.splitOperationFileExist(reference) ? @dataset_utils.getLastSplitOperation(operation, reference, hash, @chunk_size) : 0
each_chunk(path, filename, extension, size, hash, schema[1..-1].map{|elm| elm.name}, @chunk_size, delete, new_reference, resume_split) do |entry|
@dataset_utils.createSplitOperationControlFile(reference) if split
@page_builder.add(entry)
if ! delete && ! rename && entry[5] != ""
split = TRUE
@dataset_utils.saveSplitOperation(operation, reference, entry[5], hash, @chunk_size)
end
end
@page_builder.finish
@dataset_utils.deleteSplitOperationFile(reference) if split
rescue java.lang.OutOfMemoryError
@logger.logOutOfMemoryError(path)
return_value = DatasetUtils::RUN_ABORTED
......@@ -244,6 +258,7 @@ module Embulk
else
return_value = DatasetUtils::RUN_DONE
end
@dataset_utils.deleteSplitOperationControlFile(reference) if return_value != DatasetUtils::RUN_DONE
if return_value == DatasetUtils::RUN_DONE
if delete
if @dataset_utils.reportFileExist()
......@@ -261,7 +276,7 @@ module Embulk
private
def each_chunk(path, filename, extension, size, hash, fields, chunk_size=DatasetUtils::CHUNK_SIZE, delete=FALSE, new_reference=FALSE)
def each_chunk(path, filename, extension, size, hash, fields, chunk_size=DatasetUtils::CHUNK_SIZE, delete=FALSE, new_reference=FALSE, resume_split=0)
if delete
File.delete(path) if File.exist?(path)
values = [@supplier, @dataset, filename, extension, "", DatasetUtils::DELETE, "", ""]
......@@ -284,13 +299,19 @@ module Embulk
end
break
end
if resume_split > 0
@logger.info("Resuming interrupted split ingestion", print=TRUE)
first = FALSE
file_object.seek(chunk_size*resume_split+resume_split) # ignore ingested part
next_byte = file_object.read(1)
data = next_byte
npart = resume_split
resume_split = 0
end
data += file_object.read(chunk_size)
next_byte = file_object.read(1)
if not next_byte
eof = DatasetUtils::EOF
if first # this means that the whole file will be ingested at once (not split)
eof = ""
end
eof = first ? "" : DatasetUtils::EOF
else
npart += 1
eof = npart.to_s.rjust(3, "0")
......
......@@ -68,6 +68,8 @@ module Embulk
@logger.info("Your downloaded dataset is already up to date.", print=TRUE)
end
when DOWNLOAD
@dataset_utils.deleteSplitOperationControlFile()
@dataset_utils.deleteSplitOperationFile()
@logger.info("Checking remote files and posible local conflicts...", print=TRUE)
self.warnConflicts(task['data_streams'], task['data_set'])
@dataset_utils.deleteCompletedFile()
......@@ -112,7 +114,7 @@ module Embulk
task['data_set_directory'] = @dataset_utils.appendSlashTo(@output_path)
@data_set_directory = task['data_set_directory']
@dataset_utils = DatasetUtils.new(@data_set_directory)
if @dataset_utils.reportFileExist() && @dataset_utils.completedFileExist() && @dataset_utils.discardChangesFileExist() && ! @dataset_utils.initialIngestionFileExist()
if @dataset_utils.reportFileExist() && @dataset_utils.completedFileExist() && @dataset_utils.discardChangesFileExist() && ! @dataset_utils.partialIngestionFileExist()
task['discard_changes'] = @dataset_utils.discardChangesFileExist()
local_changes = @dataset_utils.getRemoteFileListForDiscardLocalChanges([], @data_set, check_changes=TRUE)
if local_changes.empty?
......@@ -159,7 +161,7 @@ module Embulk
puts
self.askUserForAction(task, action=UPDATE)
end
elsif not @dataset_utils.initialIngestionFileExist()
elsif not @dataset_utils.partialIngestionFileExist()
puts
@logger.info("There was a previous attempt to download this dataset but it did not finish successfully.", print=TRUE)
@logger.info("What do you want to do?", print=TRUE)
......@@ -195,7 +197,7 @@ module Embulk
end
@dataset_utils.createReportFile()
end
@dataset_utils.deleteInitialIngestionFile()
@dataset_utils.deletePartialIngestionFile()
@dataset_utils.deleteDiscardChangesFile()
columns = [
Column.new(0, "reference", :string),
......@@ -254,34 +256,42 @@ module Embulk
def run
data_stream = task['data_streams'][@index]
id = data_stream["id"]
ref = data_stream["reference"]
reference = data_stream["reference"]
size = data_stream["size"]
hash = data_stream["hash"]
renamed = data_stream["status"] == DatasetUtils::STATUS_RENAMED
deleted = hash.to_s == DatasetUtils::DELETE
begin
if deleted
entry = [ref, "", @data_set, DatasetUtils::DELETE, renamed]
entry = [reference, "", @data_set, DatasetUtils::DELETE, renamed]
page_builder.add(entry)
elsif renamed
new_reference = data_stream["new_reference"]
entry = [ref, new_reference, @data_set, TRUE, renamed]
entry = [reference, new_reference, @data_set, TRUE, renamed]
page_builder.add(entry)
else
@logger.info("Discarding local change on '#{data_stream["path"]}'", print=TRUE) if task['discard_changes']
@logger.info("Getting content from remote #{ref}", print=TRUE)
n_chunk = 0
@wendelin.eachDataStreamContentChunk(id, @chunk_size) do |chunk|
@logger.info("Getting content from remote #{reference}", print=TRUE)
@logger.info("Downloading...", print=TRUE)
resume_split = @dataset_utils.splitOperationFileExist(reference) ? @dataset_utils.getLastSplitOperation(DatasetUtils::DOWNLOAD, reference, hash, @chunk_size) : 0
n_chunk = resume_split == 0 ? 0 : resume_split+1
split = n_chunk > 0
@logger.info("Resuming interrupted split download...", print=TRUE) if split
@wendelin.eachDataStreamContentChunk(id, @chunk_size, n_chunk) do |chunk|
content = chunk.nil? || chunk.empty? ? "" : Base64.encode64(chunk)
begin_of_file = n_chunk == 0
entry = [ref, content, @data_set, begin_of_file, renamed]
split = n_chunk > 0
@dataset_utils.createSplitOperationControlFile(reference) if split
entry = [reference, content, @data_set, begin_of_file, renamed]
page_builder.add(entry)
@dataset_utils.saveSplitOperation(DatasetUtils::DOWNLOAD, reference, n_chunk, hash, @chunk_size) if split
n_chunk += 1
end
end
page_builder.finish
@dataset_utils.deleteSplitOperationFile(reference) if split
rescue java.lang.OutOfMemoryError
@logger.logOutOfMemoryError(ref)
@logger.logOutOfMemoryError(reference)
return_value = DatasetUtils::RUN_ABORTED
rescue Exception => e
@logger.error(e.to_s, print=TRUE)
......@@ -291,14 +301,15 @@ module Embulk
else
return_value = DatasetUtils::RUN_DONE
end
@dataset_utils.deleteSplitOperationControlFile(reference) if return_value != DatasetUtils::RUN_DONE
if return_value == DatasetUtils::RUN_DONE
if deleted
@dataset_utils.deleteFromReport(ref, return_value)
@dataset_utils.deleteFromReport(reference, return_value)
else
@dataset_utils.addToReport(ref, return_value, size, hash, task['data_set'], new_reference)
@dataset_utils.addToReport(reference, return_value, size, hash, task['data_set'], new_reference)
end
end
return {return_value => ref}
return {return_value => reference}
end
end
end
......
......@@ -18,18 +18,31 @@ module Embulk
"data_set" => config.param("data_set", :string, default: nil),
"erp5_base_url" => config.param("erp5_base_url", :string, default: nil)
}
storage_ingestion = ! task["type_input"]
task_reports = yield(task)
next_config_diff = {}
@logger = LogManager.instance()
@dataset_utils = DatasetUtils.new(Dir.pwd)
if task_reports.length > 0
@logger.info("Your ingested files will be available in the site in a few minutes. Thank for your patience.", print=TRUE)
# if ingestion was done from a storage different than filesystem, increase dataset version
if not task["type_input"] and task["data_set"] and task["erp5_base_url"]
if storage_ingestion
done_tasks = task_reports.map{|hash| hash[DatasetUtils::RUN_DONE]}.flatten.compact
@dataset_utils.showTaskReport(done_tasks)
@logger.info("#{done_tasks.length} new file(s) ingested.", print=TRUE)
if(done_tasks.length == count)
@logger.info("Dataset successfully ingested.", print=TRUE)
@wendelin = WendelinClient.new(task["erp5_base_url"], task["user"], task["password"])
@wendelin.increaseDatasetVersion(task["data_set"])
else
failed_tasks = task_reports.map{|hash| hash[DatasetUtils::RUN_ERROR] || hash[DatasetUtils::RUN_ABORTED] }.flatten.compact
@dataset_utils.showTaskErrors(failed_tasks, storage=TRUE)
end
end
else
@logger.info("No new files where processed for ingestion.", print=TRUE)
@logger.info("No new files found for ingestion.", print=TRUE)
if storage_ingestion
@logger.info("You can restart the ingestion from this storage by running your command again with parameter --restart .", print=TRUE)
end
end
return next_config_diff
end
......@@ -43,11 +56,9 @@ module Embulk
@wendelin = WendelinClient.new(@erp5_url, @user, @password)
end
def close
end
def add(page)
page.each do |record|
@return_value = DatasetUtils::RUN_ERROR
supplier = (record[0].nil? || record[0].empty?) ? "default" : record[0]
dataset = (record[1].nil? || record[1].empty?) ? "default" : record[1]
filename = record[2]
......@@ -58,23 +69,28 @@ module Embulk
hash = record[7]
begin
if eof == DatasetUtils::DELETE
reference = [dataset, filename, extension].join(DatasetUtils::REFERENCE_SEPARATOR)
@wendelin.delete(reference)
@reference = [dataset, filename, extension].join(DatasetUtils::REFERENCE_SEPARATOR)
@wendelin.delete(@reference)
elsif eof == DatasetUtils::RENAME
reference = [dataset, filename, extension].join(DatasetUtils::REFERENCE_SEPARATOR)
@wendelin.rename(reference, record[4].to_s)
@reference = [dataset, filename, extension].join(DatasetUtils::REFERENCE_SEPARATOR)
@wendelin.rename(@reference, record[4].to_s)
else
reference = [supplier, dataset, filename, extension, eof, size, hash].join(DatasetUtils::REFERENCE_SEPARATOR)
@reference = [supplier, dataset, filename, extension, eof, size, hash].join(DatasetUtils::REFERENCE_SEPARATOR)
split = eof != ""
if not @wendelin.ingest(reference, data_chunk, split)
raise "could not ingest"
ingestion_response = @wendelin.ingest(@reference, data_chunk, split)
if not ingestion_response["success"]
raise ingestion_response["message"]
end
end
rescue Exception => e
raise e
@logger.error(e.backtrace)
raise e
end
@return_value = DatasetUtils::RUN_DONE
end
end
def close
end
def finish
......@@ -84,7 +100,7 @@ module Embulk
end
def commit
task_report = {}
task_report = {@return_value => @reference}
return task_report
end
end
......
require_relative '../filelogger'
require_relative '../dataset_utils'
require_relative '../wendelin_client'
class Index
include Singleton
......@@ -53,8 +54,10 @@ module Embulk
def run(file_input)
@index = Index.instance().get()
@logger = LogManager.instance()
split = FALSE
while file = file_input.next_file
begin
@dataset_utils = DatasetUtils.new(Dir.pwd)
metadata_file = Dir.pwd + METADATA_FILE_NAME
metadata = File.open(metadata_file) {|f| f.readline} if File.exist?(metadata_file)
File.delete(metadata_file) if File.exist?(metadata_file)
......@@ -72,43 +75,65 @@ module Embulk
filename = "file_from_#{task['input_plugin']}_#{task['date']}"
extension = @index.to_s.rjust(3, "0")
end
each_chunk(file, filename.chomp, extension.chomp, task['chunk_size']) do |record|
reference = [task['data_set'], filename, extension.chomp].join(DatasetUtils::REFERENCE_SEPARATOR)
resume_split = @dataset_utils.splitOperationFileExist(reference) ? @dataset_utils.getLastSplitOperation(DatasetUtils::INGESTION, reference, "", task['chunk_size']) : 0
each_chunk(file, filename.chomp, extension.chomp, task['chunk_size'], resume_split) do |record|
@dataset_utils.createSplitOperationControlFile(reference) if split
@page_builder.add(record)
if record[5] != ""
split = TRUE
@dataset_utils.saveSplitOperation(DatasetUtils::INGESTION, reference, record[5], "", task['chunk_size'])
end
end
@page_builder.finish
Index.instance().increase()
@dataset_utils.deleteSplitOperationFile(reference) if split
rescue java.lang.OutOfMemoryError
@logger.logOutOfMemoryError(path)
@logger.logOutOfMemoryError(reference)
@dataset_utils.deleteSplitOperationControlFile(reference)
@logger.abortExecution()
rescue Exception => e
@logger.error("An error occurred during file ingestion: " + e.to_s, print=TRUE)
@logger.error(e.backtrace)
puts "[INFO] For more detailed information, please refer to the log file: " + @logger.getLogPath()
@dataset_utils.deleteSplitOperationControlFile(reference)
@logger.abortExecution(error=FALSE) if (e.to_s.include? '401' or e.to_s.include? 'Unauthorized')
raise e if not e.to_s.include? WendelinClient::HTTP_REFERENCE_EXIST
end
end
end
private
def each_chunk(file, filename, extension, chunk_size=DatasetUtils::CHUNK_SIZE)
def each_chunk(stream, filename, extension, chunk_size=DatasetUtils::CHUNK_SIZE, resume_split=0)
npart = 0
next_byte = file.read(1)
next_byte = stream.read(1)
first = TRUE
while true
data = next_byte
if not next_byte
if first
# this means this is an empty file
# this means this is an empty stream
values = [task['supplier'], task['data_set'], filename, extension, "", "", "", ""]
yield(values)
end
break
end
data += file.read(chunk_size)
next_byte = file.read(1)
if resume_split > 0
@logger.warn("Large files resuming could take some minutes", print=TRUE) if resume_split > 10
@logger.info("Resuming interrupted split ingestion...", print=TRUE)
first = FALSE
discardIngestedPart(stream, resume_split, chunk_size)
next_byte = stream.read(1)
data = next_byte
npart = resume_split
resume_split = 0
end
data += stream.read(chunk_size)
next_byte = stream.read(1)
if not next_byte
eof = DatasetUtils::EOF
if first
# this means that the whole file will be ingested at once (not split)
# this means that the whole stream will be ingested at once (not split)
eof = ""
end
else
......@@ -122,6 +147,16 @@ module Embulk
end
end
def discardIngestedPart(stream, resume_split, chunk_size)
read = 0
while read < resume_split
stream.read(chunk_size)
read += 1
sleep 1
end
stream.read(resume_split-1)
end
end
end
end
......@@ -7,6 +7,14 @@ require_relative 'filelogger'
# class representing a Wendelin client
class WendelinClient
HTTP_MESSAGE_401 = "Unauthorized access. Please check your user credentials and try again."
HTTP_MESSAGE_5XX = "Internal Server Error: if the error persists, please contact the administrator."
HTTP_MESSAGE_OTHER = "Sorry, an error ocurred. If the error persists, please contact the administrator."
HTTP_MESSAGE_NOT_2XX = "HTTP-NOT-OK"
HTTP_MESSAGE_EXCEPTION = "HTTP-ERROR"
HTTP_MEMORY_ERROR = "MEMORY-ERROR"
HTTP_REFERENCE_EXIST = "REFERENCE-EXIST"
def initialize(erp5_url, user, password)
@erp5_url = erp5_url
@user = user
......@@ -44,7 +52,7 @@ class WendelinClient
checkReferenceChars(reference)
uri = URI(URI.escape("#{@erp5_url}/ERP5Site_invalidateIngestionObjects?reference=#{reference}"))
res = handleRequest(uri)
if res == FALSE
if res["success"] == FALSE
@logger.abortExecution()
end
@logger.info("Remote file successfully ingested.", print=TRUE)
......@@ -56,7 +64,7 @@ class WendelinClient
checkReferenceChars(new_reference)
uri = URI(URI.escape("#{@erp5_url}/ERP5Site_renameIngestion?reference=#{reference}&new_reference=#{new_reference}"))
res = handleRequest(uri)
if res == FALSE
if res["success"] == FALSE
@logger.abortExecution()
end
@logger.info("Remote file successfully renamed.", print=TRUE)
......@@ -67,8 +75,8 @@ class WendelinClient
@logger.warn("Could not increase data set version because dataset reference is empty.")
else
@logger.info("Increasing dataset version")
uri = URI(URI.escape("#{@erp5_url}/ERP5Site_increaseDatasetVersion?reference=#{reference}"))
begin
uri = URI(URI.escape("#{@erp5_url}/ERP5Site_increaseDatasetVersion?reference=#{reference}"))
res = open(uri, http_basic_authentication: [@user, @password]).read
rescue Exception => e
@logger.error("An error occurred while increasing dataset version: " + e.to_s)
......@@ -87,46 +95,77 @@ class WendelinClient
@logger.info("There is another ingestion already done for the pair dataset-filename. Reference "\
+ reference, print=TRUE)
@logger.info("Rename your file or download the full dataset to make local changes.", print=TRUE)
return FALSE
return {"success"=>FALSE, "message"=>HTTP_REFERENCE_EXIST}
end
checkReferenceChars(reference)
uri = URI(URI.escape("#{@erp5_url}/ingest?reference=#{reference}"))
n_retry = 0
response = handleRequest(uri, reference, data_chunk)
if response == FALSE
return FALSE
while response["success"] == FALSE && n_retry < 10
if response["message"] != HTTP_MESSAGE_EXCEPTION && response["message"] != HTTP_MESSAGE_NOT_2XX
return response
else
n_retry += 1
@logger.info("Retrying #{n_retry}/10...", print=TRUE)
sleep 30
response = handleRequest(uri, reference, data_chunk)
end
end
if response["success"] == FALSE
return response
end
@logger.info("Record successfully ingested.", print=TRUE)
@last_ingestion = Time.new
return TRUE
return {"success"=>TRUE, "message"=>"success"}
end
def eachDataStreamContentChunk(id, chunk_size)
uri = URI(URI.escape("#{@erp5_url}#{id}/getData"))
@logger.info("Downloading...", print=TRUE)
def eachDataStreamContentChunk(id, chunk_size, n_chunk=0)
n_part = n_chunk
done = FALSE
first = TRUE
while not done
start_offset = n_part*chunk_size
end_offset = n_part*chunk_size+chunk_size
uri = URI(URI.escape("#{@erp5_url}getDataStreamChunk?id=#{id}&start_offset=#{start_offset}&end_offset=#{end_offset}"))
success = FALSE
n_retry = 0
while ! success && n_retry < 10
begin
res = open(uri, http_basic_authentication: [@user, @password]) {
|content|
while true
chunk = content.read(chunk_size)
if chunk.nil?
chunk = content.read()
if chunk.nil? || chunk.empty?
if first
yield chunk
end
@logger.info("Done", print=TRUE)
break
end
done = TRUE
else
first = FALSE
n_part += 1
yield chunk
end
}
success = TRUE
rescue Exception => e
exception = e
@logger.error("Error downloading data: " + e.to_s, print=TRUE)
@logger.error(e.backtrace)
n_retry += 1
@logger.info("Retrying #{n_retry}/10...", print=TRUE)
sleep 30
end
end
raise exception if not success
end
end
def getDataStreams(data_set_reference)
uri = URI(URI.escape("#{@erp5_url}getDataStreamList?data_set_reference=#{data_set_reference}"))
str = handleRequest(uri)
if str == FALSE
response = handleRequest(uri)
if response["success"] == FALSE
@logger.abortExecution()
end
str = response["message"]
if not str.nil?
str.gsub!(/(\,)(\S)/, "\\1 \\2")
return YAML::load(str)
......@@ -145,11 +184,14 @@ class WendelinClient
req.set_form_data('data_chunk' => data_chunk)
rescue java.lang.OutOfMemoryError
@logger.logOutOfMemoryError(reference)
return FALSE
return {"success"=>FALSE, "message"=>HTTP_MEMORY_ERROR}
rescue Exception => e
@logger.error("Error setting form data: " + e.to_s, print=TRUE)
@logger.error(e.backtrace)
return {"success"=>FALSE, "message"=>HTTP_MESSAGE_EXCEPTION}
end
@logger.info("Sending record:'#{reference}'...", print=TRUE)
end
begin
res = Net::HTTP.start(uri.hostname, uri.port,
:use_ssl => (uri.scheme == 'https'),
......@@ -161,22 +203,25 @@ class WendelinClient
rescue Exception => e
@logger.error("HTTP ERROR: " + e.to_s, print=TRUE)
@logger.error(e.backtrace)
return FALSE
return {"success"=>FALSE, "message"=>HTTP_MESSAGE_EXCEPTION}
else
if res.kind_of?(Net::HTTPSuccess) # res.code is 2XX
@logger.info("Done")
return res.body
return {"success"=>TRUE, "message"=>res.body}
else
@logger.error("HTTP FAIL - code: #{res.code}", print=TRUE)
if res.code == '500' or res.code == '502' or res.code == '503'
@logger.error("Internal Server Error: if the error persists, please contact the administrator.", print=TRUE)
@logger.error(HTTP_MESSAGE_5XX, print=TRUE)
elsif res.code == '401'
@logger.error("Unauthorized access. Please check your user credentials and try again.", print=TRUE)
@logger.error(HTTP_MESSAGE_401, print=TRUE)
@logger.abortExecution()
elsif res.code == '400'
@logger.error(HTTP_MESSAGE_400, print=TRUE)
@logger.abortExecution()
else
@logger.error("Sorry, an error ocurred. If the error persists, please contact the administrator.", print=TRUE)
@logger.error(HTTP_MESSAGE_OTHER, print=TRUE)
end
return FALSE
return {"success"=>FALSE, "message"=>HTTP_MESSAGE_NOT_2XX}
end
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment