Commit 7d2cc57d authored by Roque's avatar Roque

Use new data-stream-list server script data structure

- add to report method saves new large-hash parameter
- delete from report updated
- split download updated
- resume operation (both full and split files) updated
- warn conflicts updated
- update dataset operation updated
- rename operation (download and ingest)
- download discard changes updated
- stage feature works properly (add/remove local changes)
- ingestion reportUpToDate updated
- split ingestion updated
- dataset utils remove hardcoded url
- report up to date method updated
parent 3c00a569
...@@ -138,9 +138,9 @@ module Embulk ...@@ -138,9 +138,9 @@ module Embulk
@logger.error("Your current dataset is outdated. Please, run a download to update it before ingest your changes.", print=TRUE) @logger.error("Your current dataset is outdated. Please, run a download to update it before ingest your changes.", print=TRUE)
puts puts
@logger.abortExecution(error=FALSE) @logger.abortExecution(error=FALSE)
end end
end end
end end
@logger.info("Supplier: #{task['supplier']}") @logger.info("Supplier: #{task['supplier']}")
@logger.info("Dataset name: #{task['data_set']}") @logger.info("Dataset name: #{task['data_set']}")
...@@ -215,7 +215,7 @@ module Embulk ...@@ -215,7 +215,7 @@ module Embulk
super super
@supplier = task['supplier'] @supplier = task['supplier']
@dataset = task['data_set'] @dataset = task['data_set']
@chunk_size = task['chunk_size'] @chunk_size = DatasetUtils::CHUNK_SIZE
@data_set_directory = task['data_set_directory'] @data_set_directory = task['data_set_directory']
@logger = LogManager.instance() @logger = LogManager.instance()
@dataset_utils = DatasetUtils.new(@data_set_directory) @dataset_utils = DatasetUtils.new(@data_set_directory)
...@@ -239,13 +239,19 @@ module Embulk ...@@ -239,13 +239,19 @@ module Embulk
filename, extension, reference = @dataset_utils.getPathInfo(path, @dataset) filename, extension, reference = @dataset_utils.getPathInfo(path, @dataset)
operation = rename ? DatasetUtils::RENAME : DatasetUtils::INGESTION operation = rename ? DatasetUtils::RENAME : DatasetUtils::INGESTION
@dataset_utils.saveCurrentOperation(operation, reference, new_reference) @dataset_utils.saveCurrentOperation(operation, reference, new_reference)
resume_split = @dataset_utils.splitOperationFileExist(reference) ? @dataset_utils.getLastSplitOperation(operation, reference, hash, @chunk_size) : 0 resume_split, large_hash = 0, ""
if @dataset_utils.splitOperationFileExist(reference)
resume_split, large_hash = @dataset_utils.getLastSplitOperation(operation, reference, hash, @chunk_size, large_hash=TRUE)
end
each_chunk(path, filename, extension, size, hash, schema[1..-1].map{|elm| elm.name}, @chunk_size, delete, new_reference, resume_split) do |entry| each_chunk(path, filename, extension, size, hash, schema[1..-1].map{|elm| elm.name}, @chunk_size, delete, new_reference, resume_split) do |entry|
@dataset_utils.createSplitOperationControlFile(reference) if split @dataset_utils.createSplitOperationControlFile(reference) if split
large_hash += entry[8]
#no need to send large hash to server
entry.pop()
@page_builder.add(entry) @page_builder.add(entry)
if ! delete && ! rename && entry[5] != "" if ! delete && ! rename && entry[5] != ""
split = TRUE split = TRUE
@dataset_utils.saveSplitOperation(operation, reference, entry[5], hash, @chunk_size) @dataset_utils.saveSplitOperation(operation, reference, entry[5], hash, @chunk_size, large_hash)
end end
end end
@page_builder.finish @page_builder.finish
...@@ -269,7 +275,7 @@ module Embulk ...@@ -269,7 +275,7 @@ module Embulk
end end
else else
if @dataset_utils.reportFileExist() if @dataset_utils.reportFileExist()
@dataset_utils.addToReport(reference, return_value, size, hash, task['data_set'], new_reference) @dataset_utils.addToReport(reference, return_value, size, hash, large_hash, task['data_set'], new_reference)
end end
end end
end end
...@@ -282,11 +288,11 @@ module Embulk ...@@ -282,11 +288,11 @@ module Embulk
def each_chunk(path, filename, extension, size, hash, fields, chunk_size=DatasetUtils::CHUNK_SIZE, delete=FALSE, new_reference=FALSE, resume_split=0) def each_chunk(path, filename, extension, size, hash, fields, chunk_size=DatasetUtils::CHUNK_SIZE, delete=FALSE, new_reference=FALSE, resume_split=0)
if delete if delete
File.delete(path) if File.exist?(path) File.delete(path) if File.exist?(path)
values = [@supplier, @dataset, filename, extension, "", DatasetUtils::DELETE, "", ""] values = [@supplier, @dataset, filename, extension, "", DatasetUtils::DELETE, "", "", ""]
yield(values) yield(values)
elsif new_reference elsif new_reference
File.delete(path) if File.exist?(path) File.delete(path) if File.exist?(path)
values = [@supplier, @dataset, filename, extension, new_reference, DatasetUtils::RENAME, "", ""] values = [@supplier, @dataset, filename, extension, new_reference, DatasetUtils::RENAME, "", "", ""]
yield(values) yield(values)
else else
file_object = File.open(path, "rb") file_object = File.open(path, "rb")
...@@ -297,7 +303,7 @@ module Embulk ...@@ -297,7 +303,7 @@ module Embulk
data = next_byte data = next_byte
if not next_byte if not next_byte
if first # this means this is an empty file if first # this means this is an empty file
values = [@supplier, @dataset, filename, extension, "", "", size, hash] values = [@supplier, @dataset, filename, extension, "", "", size, hash, hash]
yield(values) yield(values)
end end
break break
...@@ -320,7 +326,8 @@ module Embulk ...@@ -320,7 +326,8 @@ module Embulk
eof = npart.to_s.rjust(3, "0") eof = npart.to_s.rjust(3, "0")
end end
content = Base64.encode64(data) content = Base64.encode64(data)
values = [@supplier, @dataset, filename, extension, content, eof, size, hash] chunk_hash = @dataset_utils.getHashFromChunk(data)
values = [@supplier, @dataset, filename, extension, content, eof, size, hash, chunk_hash]
first = FALSE first = FALSE
yield(values) yield(values)
end end
......
...@@ -242,7 +242,7 @@ module Embulk ...@@ -242,7 +242,7 @@ module Embulk
def initialize(task, schema, index, page_builder) def initialize(task, schema, index, page_builder)
super super
@data_set = task['data_set'] @data_set = task['data_set']
@chunk_size = task['chunk_size'] @chunk_size = DatasetUtils::CHUNK_SIZE
@data_set_directory = task['data_set_directory'] @data_set_directory = task['data_set_directory']
@wendelin = WendelinClient.new(task['erp5_url'], task['user'], task['password']) @wendelin = WendelinClient.new(task['erp5_url'], task['user'], task['password'])
@logger = LogManager.instance() @logger = LogManager.instance()
...@@ -250,46 +250,52 @@ module Embulk ...@@ -250,46 +250,52 @@ module Embulk
end end
def run def run
data_stream = task['data_streams'][@index] remote_file = task['data_streams'][@index]
id = data_stream["id"] reference = remote_file["reference"]
reference = data_stream["reference"] size = remote_file["full-size"]
size = data_stream["size"] large_hash = remote_file["large-hash"]
hash = data_stream["hash"] data_stream_chunk_list = remote_file["data-stream-list"]
renamed = data_stream["status"] == DatasetUtils::STATUS_RENAMED renamed = remote_file["status"] == DatasetUtils::STATUS_RENAMED
deleted = hash.to_s == DatasetUtils::DELETE deleted = large_hash.to_s == DatasetUtils::DELETE
begin begin
if deleted if deleted
entry = [reference, "", @data_set, DatasetUtils::DELETE, renamed] entry = [reference, "", @data_set, DatasetUtils::DELETE, renamed]
page_builder.add(entry) page_builder.add(entry)
elsif renamed elsif renamed
new_reference = data_stream["new_reference"] new_reference = remote_file["new_reference"]
entry = [reference, new_reference, @data_set, TRUE, renamed] entry = [reference, new_reference, @data_set, TRUE, renamed]
page_builder.add(entry) page_builder.add(entry)
else else
@logger.info("Discarding local change on '#{data_stream["path"]}'", print=TRUE) if task['discard_changes'] @logger.info("Discarding local change on '#{remote_file["path"]}'", print=TRUE) if task['discard_changes']
@logger.info("Getting content from remote #{reference}", print=TRUE) @logger.info("Getting content from remote #{reference}", print=TRUE)
@logger.info("Downloading...", print=TRUE) @logger.info("Downloading...", print=TRUE)
resume_split = @dataset_utils.splitOperationFileExist(reference) ? @dataset_utils.getLastSplitOperation(DatasetUtils::DOWNLOAD, reference, hash, @chunk_size) : 0 resume_split = @dataset_utils.splitOperationFileExist(reference) ? @dataset_utils.getLastSplitOperation(DatasetUtils::DOWNLOAD, reference, large_hash, @chunk_size) : 0
n_chunk = resume_split == 0 ? 0 : resume_split+1 n_chunk = resume_split == 0 ? 0 : resume_split+1
split = n_chunk > 0 split = n_chunk > 0
@logger.info("Resuming interrupted split download...", print=TRUE) if split @logger.info("Resuming interrupted split download...", print=TRUE) if split
@wendelin.eachDataStreamContentChunk(id, @chunk_size, n_chunk) do |chunk| data_stream_chunk_list.each_with_index do |data_stream_chunk, index|
content = chunk.nil? || chunk.empty? ? "" : Base64.encode64(chunk) #skip datastreams/chunks already downloaded
begin_of_file = n_chunk == 0 if n_chunk == index
split = n_chunk > 0 content = ""
@dataset_utils.createSplitOperationControlFile(reference) if split @wendelin.eachDataStreamContentChunk(data_stream_chunk["id"], @chunk_size, 0) do |chunk|
entry = [reference, content, @data_set, begin_of_file, renamed] content = chunk.nil? || chunk.empty? ? "" : Base64.encode64(chunk)
page_builder.add(entry) end
@dataset_utils.saveSplitOperation(DatasetUtils::DOWNLOAD, reference, n_chunk, hash, @chunk_size) if split begin_of_file = n_chunk == 0
n_chunk += 1 split = n_chunk > 0
end @dataset_utils.createSplitOperationControlFile(reference) if split
entry = [reference, content, @data_set, begin_of_file, renamed]
page_builder.add(entry)
@dataset_utils.saveSplitOperation(DatasetUtils::DOWNLOAD, reference, n_chunk, large_hash, @chunk_size) if split
n_chunk += 1
end
end
end end
page_builder.finish page_builder.finish
@dataset_utils.deleteSplitOperationFile(reference) if split @dataset_utils.deleteSplitOperationFile(reference) if split
rescue java.lang.OutOfMemoryError rescue java.lang.OutOfMemoryError
@logger.logOutOfMemoryError(reference) @logger.logOutOfMemoryError(reference)
return_value = DatasetUtils::RUN_ABORTED return_value = DatasetUtils::RUN_ABORTED
rescue Exception => e rescue Exception => e
@logger.error(e.to_s, print=TRUE) @logger.error(e.to_s, print=TRUE)
@logger.error(e.backtrace) @logger.error(e.backtrace)
puts "[INFO] For more detailed information, please refer to the log file: " + @logger.getLogPath() puts "[INFO] For more detailed information, please refer to the log file: " + @logger.getLogPath()
...@@ -302,7 +308,9 @@ module Embulk ...@@ -302,7 +308,9 @@ module Embulk
if deleted if deleted
@dataset_utils.deleteFromReport(reference, return_value) @dataset_utils.deleteFromReport(reference, return_value)
else else
@dataset_utils.addToReport(reference, return_value, size, hash, task['data_set'], new_reference) file_path = renamed ? @dataset_utils.referenceToPath(new_reference, @data_set_directory, @data_set) : @dataset_utils.referenceToPath(reference, @data_set_directory, @data_set)
hash = @dataset_utils.getHash(file_path).to_s
@dataset_utils.addToReport(reference, return_value, size, hash, large_hash, task['data_set'], new_reference)
end end
end end
return {return_value => reference} return {return_value => reference}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment