first commit

parents
#! /usr/bin/env bash
DOWN_URL='https://softinst104003.host.vifib.net/erp5/'
ING_URL='https://softinst104003.host.vifib.net/erp5/portal_ingestion_policies/wendelin_embulk'
EBULK_DATA_PATH=~/.ebulk
LOG_DIR="$EBULK_DATA_PATH/logs"
TOOL_PATH="$(dirname "$0")/ebulk-data"
DOWN_FILE="$EBULK_DATA_PATH/download-config.yml"
DOWN_TEMPLATE_FILE="$TOOL_PATH/config/download-config_template.yml"
ING_FILE="$EBULK_DATA_PATH/ingestion-config.yml"
ING_TEMPLATE_FILE="$TOOL_PATH/config/ingestion-config_template.yml"
CUSTOM_ING_FILE="$EBULK_DATA_PATH/ingestion-custom-config.yml"
CUSTOM_ING_TEMPLATE_FILE="$TOOL_PATH/config/ingestion-custom-config_template.yml"
S3_ING_FILE="$EBULK_DATA_PATH/ingestion-s3-config.yml"
S3_ING_TEMPLATE_FILE="$TOOL_PATH/config/ingestion-s3-config_template.yml"
HTTP_ING_FILE="$EBULK_DATA_PATH/ingestion-http-config.yml"
HTTP_ING_TEMPLATE_FILE="$TOOL_PATH/config/ingestion-http-config_template.yml"
FTP_ING_FILE="$EBULK_DATA_PATH/ingestion-ftp-config.yml"
FTP_ING_TEMPLATE_FILE="$TOOL_PATH/config/ingestion-ftp-config_template.yml"
GREEN='\033[0;32m'
ORANGE='\033[0;33m'
NC='\033[0m'
DEFAULT_CHUNK_SIZE="50"
function helpReadme {
echo -e "[INFO] For help, please run '${GREEN}ebulk --help${NC}'"
echo -e "[INFO] For README, please run '${GREEN}ebulk --readme${NC}'"
echo
}
function checkParameters {
if [ ! -f $TEMPLATE_FILE ]; then
echo
echo -e "${ORANGE}[ERROR] File '$TEMPLATE_FILE' not found!${NC}" >&2; return 1
fi
if [ -z "$DATASET_DIR" ]; then
echo
echo -e "${ORANGE}[ERROR] Dataset path not specified."
echo -e "[INFO] Please specify a valid dataset path.${NC}"
echo
helpReadme >&2; return 1
fi
if [ "$STORAGE" = "" ] ; then
if [ ! -d $DATASET_DIR ]; then
echo
mkdir $DATASET_DIR 2>/dev/null
if [ ! $? -eq 0 ]; then
echo
echo -e "${ORANGE}[ERROR] Dataset path not found."
echo -e "[INFO] Please specify a valid dataset path.${NC}"
echo
helpReadme >&2; return 1
fi
fi
EBULK_DATASET_FILE="$DATASET_DIR/.ebulk_dataset"
if [[ $DATASET_DIR != $REFERENCE ]]; then
DATA_SET=$REFERENCE
echo $REFERENCE > $EBULK_DATASET_FILE 2>/dev/null
else
if [ -f $EBULK_DATASET_FILE ]; then
DATA_SET=$(cat "$DATASET_DIR/.ebulk_dataset")
else
DATA_SET=$(basename "$DATASET_DIR")
echo $DATA_SET > $EBULK_DATASET_FILE 2>/dev/null
fi
fi
else
DATA_SET=$DATASET_DIR
fi
re='^[A-Za-z][_A-Za-z.0-9-]*$'
if ! [[ $DATA_SET =~ $re ]] ; then
echo
echo -e "${ORANGE}[ERROR] Error in argument: invalid dataset name ${GREEN}'$DATA_SET'${ORANGE}.${NC}"
echo -e "${ORANGE}[ERROR] Dataset name must start with a letter, and only alphanumerics, dots ( . ), underscores ( _ ) and hyphens ( - ) are allowed.${NC}"
echo
if [ -f $EBULK_DATASET_FILE ]; then
rm -f ${EBULK_DATASET_FILE}
fi
helpReadme >&2; return 1
fi
if [ ! -z "$CHUNK" ]; then
re='^[0-9]+$'
if ! [[ $CHUNK =~ $re ]] ; then
echo
echo -e "${ORANGE}[ERROR] Error in argument: chunk size must be an integer.${NC}"
echo
helpReadme >&2; return 1
fi
fi
}
function askCredentials {
echo
echo "Please, enter your ebulk user and password:"
echo
echo "** You can use the same Telecom-Wendelin-IA site user and password **"
echo "** If you don't have a user, please feel free to request one to roqueporchetto@gmail.com **"
echo
echo "User:"
read -e USER
re='^[A-Za-z][-A-Za-z0-9_]*$'
if ! [[ $USER =~ $re ]] ; then
echo
echo -e "${ORANGE}[ERROR] Invalid user name. Did enter an invalid character?"
echo -e "[INFO] Please enter a valid user name.${NC}"
echo >&2; return 1
fi
PASS= read -s -e -p Password: pwd
}
function updateConfigFile {
if [ "$STORAGE" != "" ] ; then
echo
echo "** You have chosen $STORAGE storage, please enter the information needed to configure it **"
echo
OPTION=""
if [ "$ADVANCED" = true ] ; then
echo "[INFO] If you want to edit the configuration file by yourself (advanced) please type CONFIG, otherwise press enter to continue."
read -e OPTION
if [ "$OPTION" = "CONFIG" ] ; then
CUSTOM=true
fi
fi
if [ "$OPTION" != "CONFIG" ] ; then
$PARAMETER_FUNCTION
if [ "$ADVANCED" = false ] ; then
echo "** If you need a more advanced storage configuration, you can run the tool with the parameter --advanced **"
sleep 1
fi
fi
DIFF="-c $EBULK_DATA_PATH/$STORAGE-$USER-$DATA_SET-diff.yml"
else
$PARAMETER_FUNCTION
fi
TOOL_DIR=\"$LOG_DIR\"
DATA_SET=\"$DATA_SET\"
USER=\"$USER\"
pwd=\"$pwd\"
CHUNK=\"$CHUNK\"
DATASET_DIR=\"$DATASET_DIR\"
DOWN_URL=\"$DOWN_URL\"
ING_URL=\"$ING_URL\"
STORAGE=\"$STORAGE\"
S3_BUCKET=\"$S3_BUCKET\"
S3_PREFIX=\"$S3_PREFIX\"
S3_AUTH_METHOD=\"$S3_AUTH_METHOD\"
HTTP_URL=\"$HTTP_URL\"
HTTP_METHOD=\"$HTTP_METHOD\"
FTP_HOST=\"$FTP_HOST\"
FTP_USER=\"$FTP_USER\"
FTP_PASSWORD=\"$FTP_PASSWORD\"
template="$(cat ${TEMPLATE_FILE})"
rm -f ${FILE}
eval "echo \"${template}\"" > ${FILE}
if [ "$CUSTOM" = true ] ; then
echo
echo "** Please, fill the configuration file according to the plugin."
echo
read -n 1 -s -r -p "Press any key to continue editing configuration file"
echo
vi "$FILE"
fi
}
function runProcess {
if ! checkSoftware; then
return 1
fi
if ! checkParameters; then
return 1
fi
echo -e "[INFO] Dataset: ${GREEN}$DATA_SET${NC}"
if [ ! -z "$CHUNK" ]; then
if [ "$CHUNK" -eq "0" ]; then
echo "[INFO] Default chunk size: $DEFAULT_CHUNK_SIZE Mb."
else
echo "[INFO] Chunk size set in $CHUNK Mb."
fi
fi
if ! askCredentials; then
return 1
fi
echo
echo "[INFO] Supplier: $USER"
updateConfigFile
echo "[INFO] Starting operation..."
if [ ! -d $LOG_DIR ]; then
mkdir $LOG_DIR 2>/dev/null
fi
$embulk run -L $TOOL_PATH/embulk-wendelin-dataset-tool $FILE $DIFF 2> "$LOG_DIR/error.log" || {
echo
echo -e "${ORANGE}[ERROR] Embulk tool stopped its execution.${NC}"
if [ "$STORAGE" != \"\" ] ; then
echo "[INFO] There was an error running Embulk tool, probably while connecting to storage, loading embulk gems or config files."
echo "[INFO] Please make sure your inputs are correct and configuration files are not corrupted."
fi
echo "[INFO] Please check the logs in '$LOG_DIR' directory for more details."
echo
}
}
function javaNotInstalled {
echo -e "${ORANGE}[ERROR] java 8 must be installed. Please, install java 8 and try again.${NC}"
echo "[INFO] You can download the installation package for your operative system clicking on the Download button on https://java.com/en/"
echo "[INFO] Please follow the instructions to install java 8 here https://java.com/en/download/help/download_options.xml"
echo
}
function checkCurl {
if ! hash curl 2>/dev/null; then
echo -e "${ORANGE}[ERROR] curl must be installed. Please, install curl and try again.${NC}"
echo "[INFO] To install curl, please run as root:"
echo "apt-get update"
echo "apt-get install curl" >&2; return 1
fi
}
function checkSoftware {
# CHECK JAVA VERSION
if type -p java >/dev/null; then
_java=java
elif [[ -n "$JAVA_HOME" ]] && [[ -x "$JAVA_HOME/bin/java" ]]; then
_java="$JAVA_HOME/bin/java"
else
javaNotInstalled >&2; return 1
fi
if [[ "$_java" ]]; then
version=$("$_java" -version 2>&1 | awk -F '"' '/version/ {print $2}')
echo "[INFO] Current java version: "$version""
if [[ "$version" < "1.8" ]]; then
javaNotInstalled >&2; return 1
fi
fi
# CHECK (AND INSTALL) EMBULK
embulk=~/.embulk/bin/embulk
if [ ! -f $embulk ]; then
echo -e "${ORANGE}[INFO] Embulk not installed. Installing Embulk...${NC}"
if ! checkCurl; then
return 1
fi
curl --create-dirs -o $embulk -L "https://dl.bintray.com/embulk/maven/embulk-0.9.7.jar"
chmod +x $embulk
if hash $embulk 2>/dev/null; then
echo "[INFO] Done. Embulk successfully installed."
else
echo
echo -e "${ORANGE}[ERROR] An error occurred during automatic Embulk installation.${NC}"
echo
echo "[INFO] Plase, try manual installation. Run:"
echo
echo 'curl --create-dirs -o ~/.embulk/bin/embulk -L "https://dl.bintray.com/embulk/maven/embulk-0.9.7.jar"'
echo "chmod +x ~/.embulk/bin/embulk"
echo -e "echo 'export PATH=\"\$HOME/.embulk/bin:\$PATH\"' >> ~/.bashrc"
echo "source ~/.bashrc"
echo >&2; return 1
fi
else
version=$($embulk --version 2>&1 | awk -F ' ' '/embulk/ {print $2}')
echo "[INFO] Current embulk version: "$version""
if [[ "$version" < "0.9.7" ]]; then
echo "Your embulk version is too old. Upgrading embulk to version 0.9.7"
$embulk selfupdate 0.9.7
fi
fi
# CHECK STORAGE PLUGIN
checkStoragePlugin
}
function checkStoragePlugin {
if [ "$STORAGE_GEM" != "" ] ; then
echo -n "[INFO] Checking if '$STORAGE' plugin is installed... "
$embulk gem list 2>/dev/null | grep -q "$STORAGE_GEM" 2>/dev/null
if [ $? == 0 ]; then
echo -e "${GREEN}OK${NC}"
sleep 1
else
echo -e "${ORANGE}NO${NC}"
echo "[INFO] Installing '$STORAGE' plugin..."
$embulk gem install $STORAGE_GEM 2>/dev/null | grep -q "ERROR" 2>/dev/null
$embulk gem list 2>/dev/null | grep -q "$STORAGE_GEM" 2>/dev/null
if [ $? == 0 ]; then
echo "[INFO] '$STORAGE' plugin successfully installed."
else
echo -e "${ORANGE}[ERROR] Could not find a valid Embulk plugin (gem) '$STORAGE_GEM'.${NC}"
exit
fi
fi
fi
}
function askCustomParameters {
echo "Embulk input plugin (gem):"
read -e STORAGE_GEM
if [ "$STORAGE_GEM" = "" ] ; then
echo -e "${ORANGE}[ERROR] To configure your own storage, you must specify a valid Embulk plugin (gem).${NC}"
exit
fi
echo
STORAGE=$STORAGE_GEM
checkStoragePlugin
}
function askHTTPparameters {
echo "URL:"
read -e HTTP_URL
if [ "$HTTP_URL" = "" ] ; then
echo -e "${ORANGE}[ERROR] Empty URL.${NC}"
exit
fi
echo "HTTP method (get / post):"
echo "* you can leave this input empty and GET method will be used by default *"
read -e HTTP_METHOD
if [ "$HTTP_METHOD" = "POST" ] || [ "$HTTP_METHOD" = "post" ]; then
HTTP_METHOD="post"
elif [ "$HTTP_METHOD" = "GET" ] || [ "$HTTP_METHOD" = "get" ] || [ "$HTTP_METHOD" = "" ]; then
HTTP_METHOD="get"
else
echo -e "${ORANGE}[ERROR] Invalid http method. It must be get or post.${NC}"
exit
fi
}
function askFTPparameters {
echo "FTP Host:"
echo "* (e.g. ftp.aaa.bbb.gov) *"
read -e FTP_HOST
if [ "$FTP_HOST" = "" ] ; then
echo -e "${ORANGE}[ERROR] Empty host.${NC}"
exit
fi
echo "Path prefix:"
echo "* (e.g. /mydata/sample/dir/) *"
read -e FTP_PATH
if [ "$FTP_PATH" = "" ] ; then
FTP_PATH="/"
fi
echo "FTP User:"
echo "* you can leave this input empty and anonymous authentication will be used *"
read -e FTP_USER
if [ "$FTP_USER" = "" ]; then
FTP_USER="anonymous"
else
echo "FTP Password:"
read -e FTP_PASSWORD
if [ "$FTP_PASSWORD" = "" ] ; then
echo -e "${ORANGE}[ERROR] Empty password.${NC}"
exit
fi
fi
}
function askS3parameters {
S3_AUTH_METHOD="basic"
echo "Bucket name:"
read -e S3_BUCKET
if [ "$S3_BUCKET" = "" ] ; then
echo -e "${ORANGE}[ERROR] Empty bucket name.${NC}"
exit
fi
echo "Path prefix:"
echo "* you can leave this input empty and the full bucket will be used *"
read -e S3_PREFIX
echo "Access Key ID for authentication:"
echo "* you can leave this input empty so anonymous authentication is used *"
read -e S3_ACCESS_KEY
if [ "$S3_ACCESS_KEY" = "" ] ; then
S3_AUTH_METHOD="anonymous"
else
echo "Secret Access Key for authentication:"
read -e S3_SECRET_KEY
if [ "$S3_SECRET_KEY" = "" ] ; then
echo -e "${ORANGE}[ERROR] Empty Secret Access Key.${NC}"
exit
fi
echo
fi
}
# WELCOME
echo
echo " #########################################################################"
echo " ############## WELCOME TO EBULK INGESTION-DOWNLOAD TOOL #################"
echo " ########### This tool relies on Embulk software and Java 8 ##############"
echo " ######## Do not forget to check the README before use this tool #########"
echo " ############## In case of any problem, please contact us ###############"
echo " ####################### roqueporchetto@gmail.com ########################"
echo " ###################### Happy ingestion-download ! #######################"
echo " #########################################################################"
echo
if [ ! -d $EBULK_DATA_PATH ]; then
mkdir $EBULK_DATA_PATH 2>/dev/null
if [ ! $? -eq 0 ]; then
echo
echo -e "${ORANGE}[ERROR] Could not find/create $EBULK_DATA_PATH. The tool needs access to $EBULK_DATA_PATH."
echo -e "[INFO] The directory $EBULK_DATA_PATH must exists to run the tool.${NC}"
echo
fi
fi
REFERENCE=$2
CUSTOM=false
ADVANCED=false
while [ "$1" != "" ]; do
case $1 in
-d | --directory ) shift
DATASET_DIR=$1
;;
-s | --storage ) shift
STORAGE=$1
;;
-cs | --custom-storage ) STORAGE="custom-storage"
CUSTOM=true
;;
-a | --advanced ) ADVANCED=true
;;
-c | --chunk ) shift
CHUNK=$1
;;
-h | --help ) cat $TOOL_PATH/help.md
exit
;;
-r | --readme ) less $TOOL_PATH/README.md
exit
;;
pull ) OPERATION=$1
;;
push ) OPERATION=$1
;;
*) if [[ $REFERENCE != $1 ]]; then
echo -e "${ORANGE}[ERROR] Invalid parameter '$1'.${NC}"
echo
helpReadme >&2; exit
fi
esac
shift
done
if [[ $OPERATION = "" ]]; then
echo -e "${ORANGE}[ERROR] Please specify a valid operation.${NC}"
echo
helpReadme >&2; exit
fi
if [[ $REFERENCE = "" ]]; then
echo -e "${ORANGE}[ERROR] Dataset not specified."
echo -e "[INFO] Please specify a valid dataset.${NC}"
echo
helpReadme >&2; exit
fi
if [[ $DATASET_DIR = "" ]]; then
DATASET_DIR=$REFERENCE
fi
if [[ $CHUNK = "" ]]; then
CHUNK=$DEFAULT_CHUNK_SIZE
fi
case $OPERATION in
pull)
FILE=$DOWN_FILE
TEMPLATE_FILE=$DOWN_TEMPLATE_FILE
if [ "$STORAGE" != "" ] ; then
STORAGE=""
echo "** Only file system storage is available for download currently."
echo
fi
echo "### DATASET DOWNLOAD ###"
echo
echo -e "** The dataset will be downloaded in the specified directory: $DATASET_DIR"
echo
read -n 1 -s -r -p "Press any key to continue"
echo
runProcess
;;
push)
MESSAGE="storage: $STORAGE"
if [ "$CUSTOM" = true ] ; then
FILE=$CUSTOM_ING_FILE
TEMPLATE_FILE=$CUSTOM_ING_TEMPLATE_FILE
PARAMETER_FUNCTION=askCustomParameters
else
case $STORAGE in
"" ) FILE=$ING_FILE
TEMPLATE_FILE=$ING_TEMPLATE_FILE
MESSAGE="directory: $DATASET_DIR"
;;
s3 ) FILE=$S3_ING_FILE
TEMPLATE_FILE=$S3_ING_TEMPLATE_FILE
PARAMETER_FUNCTION=askS3parameters
STORAGE_GEM=embulk-input-s3
;;
http ) FILE=$HTTP_ING_FILE
TEMPLATE_FILE=$HTTP_ING_TEMPLATE_FILE
PARAMETER_FUNCTION=askHTTPparameters
STORAGE_GEM=embulk-input-http
;;
ftp ) FILE=$FTP_ING_FILE
TEMPLATE_FILE=$FTP_ING_TEMPLATE_FILE
PARAMETER_FUNCTION=askFTPparameters
STORAGE_GEM=embulk-input-ftp
;;
*) echo -e "${ORANGE}[ERROR] '$STORAGE' storage is not available in ebulk tool yet.${NC}"
echo "[INFO] If you want to configure yourself this storage, you can run the tool with parameter --custom-storage"
echo
exit
esac
fi
echo "### DATASET INGESTION ###"
echo
echo -e "** The tool will look for dataset files in the specified $MESSAGE"
echo -e "** Please make sure to put your dataset files there for ingestion."
echo
read -n 1 -s -r -p "Press any key to continue"
echo
runProcess
;;
esac
exit
# ------ EBULK INGESTION-DOWNLOAD TOOL ------
# CONTENT:
- Bash script for ingestion and download
- Embulk plugins
- Configuration files (yml)
# REQUIREMENTS
This tool relies on **Embulk** Java application (see [docs](http://www.embulk.org/)).
Please make sure that [Java 8](http://www.oracle.com/technetwork/java/javase/downloads/index.html) is installed.
After installing the package and in the first use, the bash script will try to install Embulk automatically (if it is not installed).
If your OS needs special permission, it maybe will be necessary to install Embulk v 0.9.7 manually:
curl --create-dirs -o ~/.embulk/bin/embulk -L "https://dl.bintray.com/embulk/maven/embulk-0.9.7.jar"
chmod +x ~/.embulk/bin/embulk
echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
source ~/.bashrc
# ------ DOWNLOAD ------
# QUICK START
To start the download, run the following command:
```
ebulk pull <DATA_SET>
```
being `<DATA_SET>` the dataset reference showed in the site.
(e.g. **ebulk pull my-dataset**)
This will automatically install Embulk and it will ask for user credentials.
After authentication, it will start the download and create an output directory named as the dataset reference with the downloaded files.
`<DATA_SET>` could be also a path, then the last directory will be interpreted as the dataset reference
(e.g. **ebulk pull my_directory/sample/**) --> dataset reference will be "sample"
# CUSTOMIZE CHUNK SIZE
If there is need to specify the chunk size for split download (e.g. due to memory errors with big files),
run the command with these parameters:
```
ebulk pull <DATA_SET> -c <CHUNK_SIZE>
```
being `<CHUNK_SIZE>` an integer to set the size in Mb.
(e.g. **ebulk pull my-dataset 10**)
# CUSTOMIZE OUTPUT DIRECTORY
Allows to use a custom output directory, different from the dataset reference. That location will be linked to the dataset reference.
```
ebulk pull <DATA_SET> -d <PATH>
```
being `<PATH>` the output location of the downloaded files.
(e.g. **ebulk pull my-dataset -d some/different/path**)
The content of the <DATA_SET> will be downloaded in <PATH>, and that location will be linked to the reference <DATA_SET>
This means that no matter if the directory is moved or renamed, the operations will refer to the dataset reference:
(e.g. **ebulk pull moved/or/renamed/path** will try to download the dataset 'my-dataset')
# ------ INGESTION ------
# QUICK START
To start the ingestion, run the following command:
```
ebulk push <DATA_SET>
```
being `<DATA_SET>` the dataset reference for your dataset, and the input directory where the files are.
(e.g. **ebulk pull my-dataset**)
This will automatically install Embulk and it will ask for user credentials.
After authentication, it will start the ingestion.
# CUSTOMIZE CHUNK SIZE AND OUTPUT DIRECTORY
The chunk size to split the ingestions or the input directory customization works as in the download operation.
(e.g. **ebulk push my-dataset 10**)
(e.g. **ebulk push my-dataset -d some/different/path**)
# USE A DIFFERENT INPUT STORAGE
Ebulk tool has some preinstalled input storage that user can use to ingest from different locations than file system. These are:
- File transfer protocol: ftp
- HTTP request: http
- Amazon web service S3: s3
To use one of those storages as input, run the following command:
```
ebulk push <DATA_SET> --storage <STORAGE>
```
being `<STORAGE>` one of the following available inputs: ftp, http, s3
(e.g. **ebulk push my-dataset --storage http**)
Each storage will request the user inputs like credentials, urls, etc. depending on each case.
# ADVANCED STORAGE
The user can edit the Embulk configuration file of the selected storage to run more complex scenarios
* Please keep in mind that some knowledge about Embulk is required
```
ebulk push <DATA_SET> --storage <STORAGE> --advanced
```
# CUSTOM
The user can request the installation of a new input storage, running the following command:
```
ebulk push <DATA_SET> --custom-storage
```
The tool will request the user to input the desired Embulk input plugin (gem) in order to install it.
The input gem can be pick from here: http://www.embulk.org/plugins/
exec:
max_threads: 1
min_output_tasks: 1
in:
type: file
path_prefix: ./csv/
parser:
charset: UTF-8
type: csv
delimiter: ';'
columns:
- {name: id, type: string}
- {name: id2, type: string}
- {name: id3, type: string}
- {name: id4, type: string}
out:
type: wendelin
erp5_url: "https://softinst102878.host.vifib.net/erp5/portal_ingestion_policies/wendelin_embulk"
user: "zope"
password: "asd"
exec:
max_threads: 1
min_output_tasks: 1
in:
type: file
path_prefix: ./csv/
parser:
charset: UTF-8
# newline: CRLF
type: csv
delimiter: ';'
# quote: '"'
# escape: ''
# null_string: 'NULL'
columns:
- {name: id, type: string}
- {name: id2, type: string}
- {name: id3, type: string}
- {name: id4, type: string}
out:
type: wendelin
erp5_url: "https://softinst102878.host.vifib.net/erp5/portal_ingestion_policies/wendelin_embulk"
user: "zope"
password: "asd"
exec:
max_threads: 1
min_output_tasks: 1
in:
type: wendelin
erp5_url: "https://softinst102878.host.vifib.net/erp5/"
user: "asd"
password: "asd"
data_set: "sample"
chunk_size: "50"
output_path: "sample"
tool_dir: "."
out:
type: fif
output_path: "sample"
tool_dir: "."
exec:
max_threads: 1
min_output_tasks: 1
in:
type: wendelin
erp5_url: $DOWN_URL
user: $USER
password: $pwd
data_set: $DATA_SET
chunk_size: $CHUNK
output_path: $DATASET_DIR
tool_dir: $TOOL_DIR
out:
type: fif
output_path: $DATASET_DIR
exec:
max_threads: 1
min_output_tasks: 1
in:
type: wendelin
erp5_url: $DOWN_URL
user: $USER
password: $pwd
data_set: $DATA_SET
chunk_size: $CHUNK
output_path: $DATASET_DIR
tool_dir: $TOOL_DIR
out:
type: fif
output_path: $DATASET_DIR
tool_dir: $TOOL_DIR
exec:
max_threads: 1
min_output_tasks: 1
in:
type: fif
path_prefix: ["input/"]
supplier: [SUPPLIER]
data_set: [DATA_SET]
chunk_size: 0
out:
type: wendelin
erp5_url: 'https://softinst79462.host.vifib.net/erp5/portal_ingestion_policies/wendelin_embulk'
user: [USER]
password: [PASSWORD]
tag: supplier.dataset.filename.extension.end
exec:
max_threads: 1
min_output_tasks: 1
in:
type: fif
path_prefix: [$DATASET_DIR]
supplier: $USER
data_set: $DATA_SET
chunk_size: $CHUNK
erp5_url: $DOWN_URL
user: $USER
password: $pwd
tool_dir: $TOOL_DIR
out:
type: wendelin
erp5_url: $ING_URL
user: $USER
password: $pwd
exec:
max_threads: 1
min_output_tasks: 1
in:
type: fif
path_prefix: [$DATASET_DIR]
supplier: $USER
data_set: $DATA_SET
chunk_size: $CHUNK
erp5_url: $DOWN_URL
user: $USER
password: $pwd
tool_dir: $TOOL_DIR
out:
type: wendelin
erp5_url: $ING_URL
user: $USER
password: $pwd
tool_dir: $TOOL_DIR
# CUSTOM CONFIGURATION FILE
# PLEASE FILL THE FILE WITH THE CONFIGURATION OF YOUR CUSTOM EMBULK PLUGIN
# ONLY THE 'IN' SECTION, OTHERS MUST REMAIN AS THEY ARE
# PLEASE FILL THE 'IN' SECTION ACCORDING TO YOUR PLUGIN
in:
# FOR EXAMPLE CSV FILES
# type: file
# path_prefix: MY_CSV_DIRECTORY
# FOR EXAMPLE AWS-S3 storage:
# type: s3
# bucket: MY_BUCKET
# path_prefix: ""
# access_key_id: MY_KEY_ID
# secret_access_key: MY_SECRET_KEY
# PLEASE LEAVE THE SECTIONS BELOW AS THEY ARE (unless you know what you are doing)
parser:
type: binary
supplier: $USER
data_set: $DATA_SET
tool_dir: $TOOL_DIR
chunk_size: $CHUNK
storage: $STORAGE
out:
type: wendelin
erp5_url: $ING_URL
user: $USER
password: $pwd
exec:
max_threads: 1
min_output_tasks: 1
# CUSTOM CONFIGURATION FILE
# PLEASE FILL THE FILE WITH THE CONFIGURATION OF YOUR CUSTOM EMBULK PLUGIN
# ONLY THE 'IN' SECTION, OTHERS MUST REMAIN AS THEY ARE
# PLEASE FILL THE 'IN' SECTION ACCORDING TO YOUR PLUGIN
in:
# FOR EXAMPLE CSV FILES
# type: file
# path_prefix: MY_CSV_DIRECTORY
# FOR EXAMPLE AWS-S3 storage:
# type: s3
# bucket: MY_BUCKET
# path_prefix: ""
# access_key_id: MY_KEY_ID
# secret_access_key: MY_SECRET_KEY
# PLEASE LEAVE THE SECTIONS BELOW AS THEY ARE (unless you know what you are doing)
parser:
type: binary
supplier: $USER
data_set: $DATA_SET
tool_dir: $TOOL_DIR
chunk_size: $CHUNK
input_plugin: $STORAGE
out:
type: wendelin
erp5_url: $ING_URL
user: $USER
password: $pwd
exec:
max_threads: 1
min_output_tasks: 1
# FTP CONFIGURATION FILE
# PLEASE FILL THE FILE WITH THE CONFIGURATION OF YOUR FTP STORAGE
# ONLY THE 'IN' SECTION, OTHERS MUST REMAIN AS THEY ARE
in:
type: ftp
host: $FTP_HOST
user: $FTP_USER
password: $FTP_PASSWORD
path_prefix: $FTP_PATH
#ssl_verify: false
#port: 21
# PLEASE LEAVE THE SECTIONS BELOW AS THEY ARE (unless you know what you are doing)
parser:
type: binary
supplier: $USER
data_set: $DATA_SET
tool_dir: $TOOL_DIR
chunk_size: $CHUNK
storage: $STORAGE
out:
type: wendelin
erp5_url: $ING_URL
user: $USER
password: $pwd
exec:
max_threads: 1
min_output_tasks: 1
# FTP CONFIGURATION FILE
# PLEASE FILL THE FILE WITH THE CONFIGURATION OF YOUR FTP STORAGE
# ONLY THE 'IN' SECTION, OTHERS MUST REMAIN AS THEY ARE
in:
type: ftp
host: $FTP_HOST
user: $FTP_USER
password: $FTP_PASSWORD
path_prefix: $FTP_PATH
#ssl_verify: false
#port: 21
# PLEASE LEAVE THE SECTIONS BELOW AS THEY ARE (unless you know what you are doing)
parser:
type: binary
supplier: $USER
data_set: $DATA_SET
tool_dir: $TOOL_DIR
chunk_size: $CHUNK
storage: $STORAGE
out:
type: wendelin
erp5_url: $ING_URL
user: $USER
password: $pwd
exec:
max_threads: 1
min_output_tasks: 1
# HTTP CONFIGURATION FILE
# PLEASE FILL THE FILE WITH THE CONFIGURATION OF YOUR HTTP URL
# ONLY THE 'IN' SECTION, OTHERS MUST REMAIN AS THEY ARE
in:
type: http
url: "http://archive.ics.uci.edu/ml/machine-learning-databases/00000/Donnees%20conso%20autos.txt"
method: "get"
# basic_auth:
# user: MyUser
# password: MyPassword
# params:
# - {name: paramA, value: valueA}
# - {name: paramB, value: valueB}
# PLEASE LEAVE THE SECTIONS BELOW AS THEY ARE (unless you know what you are doing)
parser:
type: binary
supplier: "zope"
data_set: "http"
tool_dir: "."
chunk_size: "50"
storage: "http"
path_prefix:
out:
type: wendelin
erp5_url: "https://softinst102878.host.vifib.net/erp5/portal_ingestion_policies/wendelin_embulk"
user: "zope"
password: "telecom"
exec:
max_threads: 1
min_output_tasks: 1
# HTTP CONFIGURATION FILE
# PLEASE FILL THE FILE WITH THE CONFIGURATION OF YOUR HTTP URL
# ONLY THE 'IN' SECTION, OTHERS MUST REMAIN AS THEY ARE
in:
type: http
url: $HTTP_URL
method: $HTTP_METHOD
# basic_auth:
# user: MyUser
# password: MyPassword
# params:
# - {name: paramA, value: valueA}
# - {name: paramB, value: valueB}
# PLEASE LEAVE THE SECTIONS BELOW AS THEY ARE (unless you know what you are doing)
parser:
type: binary
supplier: $USER
data_set: $DATA_SET
tool_dir: $TOOL_DIR
chunk_size: $CHUNK
storage: $STORAGE
out:
type: wendelin
erp5_url: $ING_URL
user: $USER
password: $pwd
exec:
max_threads: 1
min_output_tasks: 1
# HTTP CONFIGURATION FILE
# PLEASE FILL THE FILE WITH THE CONFIGURATION OF YOUR HTTP URL
# ONLY THE 'IN' SECTION, OTHERS MUST REMAIN AS THEY ARE
in:
type: http
url: $HTTP_URL
method: $HTTP_METHOD
# basic_auth:
# user: MyUser
# password: MyPassword
# params:
# - {name: paramA, value: valueA}
# - {name: paramB, value: valueB}
# PLEASE LEAVE THE SECTIONS BELOW AS THEY ARE (unless you know what you are doing)
parser:
type: binary
supplier: $USER
data_set: $DATA_SET
tool_dir: $TOOL_DIR
chunk_size: $CHUNK
storage: $STORAGE
path_prefix: $HTTP_PREFIX
out:
type: wendelin
erp5_url: $ING_URL
user: $USER
password: $pwd
exec:
max_threads: 1
min_output_tasks: 1
exec:
max_threads: 1
min_output_tasks: 1
in:
type: s3
bucket: "roque5"
path_prefix: ""
access_key_id: "AKIAJLY3N4YBNAJMBLGQ"
secret_access_key: "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"
parser:
type: binary
supplier: "zope"
data_set: "encoding"
tool_dir: "."
chunk_size: "5"
input_plugin "s3"
out:
type: wendelin
erp5_url: "https://softinst102878.host.vifib.net/erp5/portal_ingestion_policies/wendelin_embulk"
user: "zope"
password: "telecom"
# S3 CONFIGURATION FILE
# PLEASE FILL THE FILE WITH THE CONFIGURATION OF YOUR S3 BUCKET
# ONLY THE 'IN' SECTION, OTHERS MUST REMAIN AS THEY ARE
in:
type: s3
bucket: $S3_BUCKET
path_prefix: $S3_PREFIX
access_key_id: $S3_ACCESS_KEY
secret_access_key: $S3_SECRET_KEY
auth_method: $S3_AUTH_METHOD
# endpoint:
# region:
# path_match_pattern:
# http_proxy:
# host:
# port:
# PLEASE LEAVE THE SECTIONS BELOW AS THEY ARE (unless you know what you are doing)
parser:
type: binary
supplier: $USER
data_set: $DATA_SET
tool_dir: $TOOL_DIR
chunk_size: $CHUNK
storage: $STORAGE
out:
type: wendelin
erp5_url: $ING_URL
user: $USER
password: $pwd
exec:
max_threads: 1
min_output_tasks: 1
# S3 CONFIGURATION FILE
# PLEASE FILL THE FILE WITH THE CONFIGURATION OF YOUR S3 BUCKET
# ONLY THE 'IN' SECTION, OTHERS MUST REMAIN AS THEY ARE
in:
type: s3
bucket: $S3_BUCKET
path_prefix: $S3_PREFIX
access_key_id: $S3_ACCESS_KEY
secret_access_key: $S3_SECRET_KEY
auth_method: $S3_AUTH_METHOD
# endpoint:
# region:
# path_match_pattern:
# http_proxy:
# host:
# port:
# PLEASE LEAVE THE SECTIONS BELOW AS THEY ARE (unless you know what you are doing)
parser:
type: binary
supplier: $USER
data_set: $DATA_SET
tool_dir: $TOOL_DIR
chunk_size: $CHUNK
storage: $STORAGE
path_prefix: $S3_PREFIX
out:
type: wendelin
erp5_url: $ING_URL
user: $USER
password: $pwd
exec:
max_threads: 1
min_output_tasks: 1
source 'https://rubygems.org/'
gemspec
MIT License
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
# Embulk-wendelin-dataset-tool input-output plugin for Embulk
Input and output plugins for wendelin dataset-tool.
################### INPUT PLUGINS ###################
## Overview
* **Plugin type**: fif
* **Resume supported**: not for now
* **Cleanup supported**: not for now
* **Guess supported**: no
## Configuration
- **path_prefix**: description (array, required)
- **supplier**: description (string, default: `"default"`)
- **dataset**: description (string, default: `"default"`)
- **chunk_size**: description (integer, default: `0`)
## Schema (self included in plugin)
- {"name"=>"supplier", "type"=>"string"}
- {"name"=>"dataset", "type"=>"string"}
- {"name"=>"file", "type"=>"string"},
- {"name"=>"extension", "type"=>"string"}
- {"name"=>"end", "type"=>"string"}
- {"name"=>"data_chunk", "type"=>"string"}
## Overview
* **Plugin type**: wendelin
* **Resume supported**: not for now
* **Cleanup supported**: not for now
* **Guess supported**: no
- **chunk_size**: description (integer, default: `0`)
## Configuration
- **erp5_url**: description (array, required)
- **user**: description (string, required)
- **password**: description (string, required)
- **supplier**: description (string, default: `"default"`)
- **dataset**: description (string, default: `"default"`)
################### OUTPUT PLUGINS ###################
## Overview
* **Plugin type**: wendelin
* **Resume supported**: not for now
* **Cleanup supported**: not for now
* **Guess supported**: no
## Configuration
- **erp5_url**: description (array, required)
- **user**: description (string, required)
- **password**: description (string, required)
- **tag**: "supplier.dataset.filename.extension.end"
## Overview
* **Plugin type**: fif
* **Resume supported**: not for now
* **Cleanup supported**: not for now
* **Guess supported**: no
## Configuration
- **output_path**: description (string, required)
require "bundler/gem_tasks"
task default: :build
Gem::Specification.new do |spec|
spec.name = "embulk-wendelin-dataset-tool"
spec.version = "0.1.0"
spec.authors = ["Roque Porchetto"]
spec.summary = "Input/output plugin for ingestion/download of datasets to/from wendelin"
spec.description = "Loads records from fif files in local storage and send them to wendelin via http. Loads records from wendelin via http and store them locally."
spec.email = ["roqueporchetto@gmail.com"]
spec.licenses = ["MIT"]
# TODO set this: spec.homepage = "https://github.com/roqueporchetto/embulk-wendelin-dataset-tool"
spec.files = `git ls-files`.split("\n") + Dir["classpath/*.jar"]
spec.test_files = spec.files.grep(%r{^(test|spec)/})
spec.require_paths = ["lib"]
#spec.add_dependency 'YOUR_GEM_DEPENDENCY', ['~> YOUR_GEM_DEPENDENCY_VERSION']
spec.add_development_dependency 'embulk', ['>= 0.8.15']
spec.add_development_dependency 'bundler', ['>= 1.10.6']
spec.add_development_dependency 'rake', ['>= 10.0']
end
require 'singleton'
# class representing a file logger
class LogManager
include Singleton
INFO = "INFO"
ERROR = "ERROR"
WARN = "WARNING"
def initialize()
now = Time.now.strftime("%d-%m-%Y")
@filename = "#{now.to_s}.log"
end
def setFilename(tool_dir, prefix)
log_dir = "#{tool_dir}/"
if not File.directory?(log_dir)
Dir.mkdir log_dir
end
@path = "#{log_dir}#{prefix}_#{@filename}"
File.open(@path, 'a') { |file| file.puts "------------------------------------------------" + "\r\n" }
end
def getLogPath()
return @path
end
def info(message, print=FALSE)
log(message, print, type=INFO)
end
def warn(message, print=FALSE)
log(message, print, type=WARN)
end
def error(message, print=FALSE)
log(message, print, type=ERROR)
end
def abortExecution()
puts
info("PROCESS ABORTED")
unless @path.nil?
puts "PROCESS ABORTED : For more detailed information, please refer to the log file '#{@path}'"
end
exec("Process.kill 9, Process.pid >/dev/null 2>&1")
end
def logOutOfMemoryError(reference)
error("The data chunk size is too large. Proccess aborted for #{reference}.", print=TRUE)
info("Please, check the help or README.md to customize the chunk size.", print=TRUE)
end
def logArray(messagesArray, print, type)
messagesArray.each { |message| log(message, print, type) }
end
def log(message, print=FALSE, type=INFO)
if message.kind_of?(Array)
logArray(message, print, type)
return
end
if print
puts "[#{type}] #{message}"
end
now = Time.now.strftime("%d-%m-%Y %H:%M:%S").to_s
entry = "#{now} - #{type} : #{message}" + "\r\n"
unless @path.nil?
File.open(@path, 'a') { |file| file.puts entry }
end
end
end
require_relative '../wendelin_client'
require_relative '../filelogger'
module Embulk
module Input
class Fifinput < InputPlugin
TASK_REPORT_FILE = ".ingestion-task-report"
COMPLETED_FILE = ".ingestion-completed"
RUN_DONE = "done"
RUN_ERROR = "error"
RUN_ABORTED = "aborted"
Plugin.register_input("fif", self)
def self.getTaskReportFilename(data_set_directory)
return data_set_directory + TASK_REPORT_FILE
end
def self.getCompletedFilename(data_set_directory)
return data_set_directory + COMPLETED_FILE
end
EOF = "EOF"
CHUNK_SIZE = 50000000 #50mb
MEGA = 1000000
SCHEMA = [
{"name"=>"supplier", "type"=>"string"},
{"name"=>"data_set", "type"=>"string"},
{"name"=>"file", "type"=>"string"},
{"name"=>"extension", "type"=>"string"},
{"name"=>"data_chunk", "type"=>"string"},
{"name"=>"eof", "type"=>"string"}
]
def self.filterDoneTasks(files, task_report_file, data_set, root_path)
data_set = data_set.end_with?("/") ? data_set : data_set + "/"
ingested_files = []
File.readlines(task_report_file).each do |line|
record = line.split(";")
if record[1].chomp == RUN_DONE
ingested = record[0].sub(data_set, root_path)
ingested_files.push(ingested)
end
end
pending_files = []
files.each do |file|
if ! ingested_files.include? file
pending_files.push(file)
end
end
return pending_files
end
def self.filterExistingFiles(files, data_streams, data_set, root_path)
data_set = data_set.end_with?("/") ? data_set : data_set + "/"
existing_files = []
data_streams["result"].each do |data_stream|
file_path = root_path + data_stream[1].reverse.sub("/".reverse, ".".reverse).reverse.sub(data_set, "")
if file_path.end_with?(".none")
file_path = file_path[0...-5]
end
existing_files.push(file_path)
end
filtered_files = files - existing_files
ignored_files = files - filtered_files
if not ignored_files.empty?
puts
@logger.info("There are files in the local directory that already exist in data set and will be ignored for ingestion.", print=TRUE)
puts
end
if filtered_files.empty?
@logger.info("The dataset directory '#{root_path}' does not contain any new file for the data set '#{data_set}'.", print=TRUE)
@logger.info("Please make sure your dataset directory contains new files for ingestion.", print=TRUE)
@logger.abortExecution()
end
return filtered_files
end
def self.transaction(config, &control)
begin
tool_dir = config.param('tool_dir', :string)
@logger = LogManager.instance()
@logger.setFilename(tool_dir, "ingestion")
task = { 'paths' => [] }
task['supplier'] = config.param('supplier', :string)
task['data_set'] = config.param('data_set', :string)
task['chunk_size'] = config.param('chunk_size', :float, default: 0) * MEGA
if task['chunk_size'] == 0
task['chunk_size'] = CHUNK_SIZE
end
paths = config.param('path_prefix', :array)
paths[0] = paths[0].end_with?("/") ? paths[0] : paths[0] + "/"
task['inputs'] = paths
task['paths'] = paths.map {|path|
next [] unless Dir.exist?(path)
Dir[(path + '/**/*').gsub! '//', '/']
}.flatten.select{ |file| File.file?(file) }
@wendelin = WendelinClient.new(config.param('erp5_url', :string), config.param('user', :string), config.param('password', :string))
data_stream_list = @wendelin.getDataStreams(task['data_set'])
if data_stream_list["status_code"] == 0
task['data_streams'] = data_stream_list["result"]
else
@logger.error(data_stream_list["error_message"], print=TRUE)
@logger.abortExecution()
end
@logger.info("Supplier: #{task['supplier']}")
@logger.info("Dataset name: #{task['data_set']}")
@logger.info("Chunk size set in #{task['chunk_size']/MEGA}MB")
if task['data_set'] == '$DATA_SET'
@logger.error("There was an error setting the configuration file", print=TRUE)
@logger.info("Please try manual ingestion or update manually the ingestion configuration file.", print=TRUE)
@logger.abortExecution()
end
if task['paths'].empty?
@logger.error("The dataset directory '#{task['inputs'][0]}' is empty.", print=TRUE)
@logger.error("Could not find any valid file.", print=TRUE)
@logger.error("Please make sure your dataset directory contains files for ingestion.", print=TRUE)
@logger.abortExecution()
end
@data_set_directory = paths[0]
task_report_file = getTaskReportFilename(@data_set_directory)
completed_file = getCompletedFilename(@data_set_directory)
delete_task_report = FALSE
if File.file?(task_report_file)
task['paths'] = filterDoneTasks(task['paths'], task_report_file, task['data_set'], @data_set_directory)
if File.file?(completed_file)
delete_task_report = TRUE
else
puts
@logger.info("There was a previous attempt to ingest this dataset but it did not finish successfully.", print=TRUE)
@logger.info("Resuming ingestion...", print=TRUE)
end
else
File.open(task_report_file, 'w') {}
end
task['paths'] = filterExistingFiles(task['paths'], data_stream_list, task['data_set'], @data_set_directory)
@logger.info("#{task['paths'].length} new file(s) detected for ingestion: ", print=TRUE)
if task['paths'].length > 15
@logger.info(task['paths'][0, 5], print=TRUE)
@logger.info(".....", print=TRUE)
@logger.info(task['paths'][task['paths'].length-5, task['paths'].length-1], print=TRUE)
else
@logger.info(task['paths'], print=TRUE)
end
puts
@logger.info("Continue with ingestion? (y/n)", print=TRUE)
option = gets
option = option.chomp
if option == "n"
@logger.info("Ingestion cancelled by user.", print=TRUE)
@logger.abortExecution()
end
if delete_task_report
File.delete(task_report_file) if File.exist?(task_report_file)
File.open(task_report_file, 'w') {}
end
File.delete(completed_file) if File.exist?(completed_file)
columns = [
Column.new(0, "supplier", :string),
Column.new(1, "data_set", :string),
Column.new(2, "file", :string),
Column.new(3, "extension", :string),
Column.new(4, "data_chunk", :string),
Column.new(5, "eof", :string)
]
commit_reports = yield(task, columns, task['paths'].length)
done = commit_reports.map{|hash| hash["done"]}.flatten.compact
resume(task, columns, task['paths'].length, &control)
rescue Exception => e
@logger.error("An error occurred during operation: " + e.to_s, print=TRUE)
@logger.error(e.backtrace)
puts "[INFO] For more detailed information, please refer to the log file."
@logger.abortExecution()
end
end
def self.resume(task, columns, count, &control)
@logger = LogManager.instance()
task_reports = yield(task, columns, count)
@logger.info("Reports:", print=TRUE)
if task_reports.length > 15
@logger.info(task_reports[0, 5], print=TRUE)
@logger.info(".....", print=TRUE)
@logger.info(task_reports[task_reports.length-5, task_reports.length-1], print=TRUE)
@logger.info("Full task report:")
@logger.info(task_reports)
else
@logger.info(task_reports, print=TRUE)
end
next_config_diff = task_reports.map{|hash| hash["done"]}.flatten.compact
@logger.info("#{next_config_diff.length} file(s) ingested.", print=TRUE)
#if next_config_diff.length > 0
# @logger.info("FIF INPUT - Your ingested files will be available in the site in a few minutes. Thank for your patience.", print=TRUE)
#end
if(next_config_diff.length == count)
@logger.info("Dataset successfully ingested.", print=TRUE)
completed_file = getCompletedFilename(@data_set_directory)
File.open(completed_file, 'w') {}
else
next_config_diff = task_reports.map{|hash| hash["error"]}.flatten.compact
puts
@logger.error("The following files could not be ingested. Please check the details in the log file: " + @logger.getLogPath(), print=TRUE)
if next_config_diff.length > 15
@logger.error(next_config_diff[0, 5], print=TRUE)
@logger.error(".....", print=TRUE)
@logger.error(next_config_diff[next_config_diff.length-5, next_config_diff.length-1], print=TRUE)
else
@logger.error(next_config_diff, print=TRUE)
end
@logger.info("You can retry the ingestion for those pending files.", print=TRUE)
puts
end
next_config_diff = {}
return {"done" => next_config_diff}
end
def initialize(task, schema, index, page_builder)
super
@supplier = task['supplier']
@dataset = task['data_set']
@chunk_size = task['chunk_size']
@input_dirs = task['inputs']
@data_set_directory = task['inputs'][0]
@logger = LogManager.instance()
end
def run
path = task['paths'][@index]
@logger.info("Processing file #{path.to_s}", print=TRUE)
begin
each_chunk(path, schema[1..-1].map{|elm| elm.name}, @chunk_size) do |entry|
@page_builder.add(entry)
end
@page_builder.finish
rescue java.lang.OutOfMemoryError
@logger.logOutOfMemoryError(path)
return_value = RUN_ABORTED
rescue Exception => e
@logger.error("An error occurred during file ingestion: " + e.to_s, print=TRUE)
@logger.error(e.backtrace)
puts "[INFO] For more detailed information, please refer to the log file: " + @logger.getLogPath()
return_value = RUN_ERROR
else
return_value = RUN_DONE
end
task_report_file = Fifinput.getTaskReportFilename(@data_set_directory)
dataset_file = path.sub(@data_set_directory, @dataset.end_with?("/") ? @dataset : @dataset + "/")
File.open(task_report_file, 'ab') { |file| file.puts(dataset_file+";"+return_value+";") }
return {return_value => path}
end
private
def each_chunk(path, fields, chunk_size=CHUNK_SIZE)
extension = File.extname path
if path.start_with?(@input_dirs[0])
filename = path.sub(@input_dirs[0], "")
filename = filename.reverse.sub(extension.reverse, "").reverse
end
extension.gsub! '.', ''
if extension == ""
extension = "none"
end
file_object = File.open(path, "rb")
npart = 0
next_byte = file_object.read(1)
first = TRUE
while true
data = next_byte
if not next_byte
if first
values = [@supplier, @dataset, filename, extension, "", EOF]
yield(values)
end
break
end
first = FALSE
data += file_object.read(chunk_size)
next_byte = file_object.read(1)
if not next_byte
eof = EOF
else
npart += 1
eof = npart.to_s.rjust(3, "0")
end
content = Base64.encode64(data)
values = [@supplier, @dataset, filename, extension, content, eof]
yield(values)
end
end
end
end
end
require_relative '../wendelin_client'
require 'fileutils'
module Embulk
module Input
class Wendelininput < InputPlugin
CHUNK_SIZE = 50000000 #50mb
MEGA = 1000000
UPDATE = "U"
RESUME = "R"
DOWNLOAD = "D"
ABORT = "A"
TASK_REPORT_FILE = ".download-task-report"
COMPLETED_FILE = ".download-completed"
RUN_DONE = "done"
RUN_ERROR = "error"
RUN_ABORTED = "aborted"
Plugin.register_input("wendelin", self)
def self.getTaskReportFilename(data_set_directory)
return data_set_directory + TASK_REPORT_FILE
end
def self.getCompletedFilename(data_set_directory)
return data_set_directory + COMPLETED_FILE
end
def self.getPendingDataStreams(data_streams, task_report_file)
donwloaded_data_streams = []
File.readlines(task_report_file).each do |line|
record = line.split(";")
if record[1].chomp == RUN_DONE
donwloaded_data_streams.push(record[0])
end
end
pending_data_streams = []
data_streams.each do |data_stream|
if ! donwloaded_data_streams.include? data_stream[1]
pending_data_streams.push(data_stream)
end
end
return pending_data_streams
end
def self.askUserForAction(task, completed_file, task_report_file, action)
if action == RESUME
action_message = "#{RESUME}: Resume. Continues download from last file."
else
action = UPDATE
action_message = "#{UPDATE}: Update. Checks for new files."
end
valid_option = FALSE
while not valid_option
@logger.info("Please select an option [#{action}, #{DOWNLOAD}, #{ABORT}]", print=TRUE)
@logger.info(action_message, print=TRUE)
@logger.info("#{DOWNLOAD}: Download. Downloads the dataset from scratch.", print=TRUE)
@logger.info("#{ABORT}: Abort operation.", print=TRUE)
option = gets
option = option.chomp
if not [action, DOWNLOAD, ABORT].include? option
@logger.info("Invalid option", print=TRUE)
else
valid_option = TRUE
end
end
case option
when action
task['data_streams'] = getPendingDataStreams(task['data_streams'], task_report_file)
File.delete(completed_file) if File.exist?(completed_file)
if task['data_streams'].empty?
@logger.info("No new files in dataset.", print=TRUE)
@logger.info("Your downloaded dataset is already up to date.", print=TRUE)
end
when DOWNLOAD
ebulk_file = @data_set_directory + "/.ebulk_dataset"
ebulk_file_content = ""
if File.file?(ebulk_file)
ebulk_file_content = File.read(ebulk_file)
end
FileUtils.rm_rf(@data_set_directory)
unless File.directory?(@data_set_directory)
FileUtils.mkdir_p(@data_set_directory)
end
if ebulk_file_content != ""
File.open(ebulk_file, 'w') { |file| file.write(ebulk_file_content) }
end
File.delete(completed_file) if File.exist?(completed_file)
File.open(task_report_file, 'w') {}
when ABORT
@logger.abortExecution()
end
end
def self.transaction(config, &control)
begin
@tool_dir = config.param('tool_dir', :string)
@logger = LogManager.instance()
@logger.setFilename(@tool_dir, "download")
@erp5_url = config.param('erp5_url', :string)
@data_set = config.param('data_set', :string)
@logger.info("Dataset name: #{@data_set}")
if @data_set == '$DATA_SET'
@logger.error("There was an error setting the configuration file", print=TRUE)
@logger.info("Please try manual download or update manually the download configuration file.", print=TRUE)
@logger.abortExecution()
end
@user = config.param("user", :string, defualt: nil)
@logger.info("User: #{@user}")
@password = config.param("password", :string, default: nil)
@chunk_size = config.param('chunk_size', :float, default: 0) * MEGA
@output_path = config.param("output_path", :string, :default => nil)
if File.directory?(@output_path)
else
@logger.error("Output directory not found.", print=TRUE)
@logger.abortExecution()
end
@wendelin = WendelinClient.new(@erp5_url, @user, @password)
task = {
'erp5_url' => @erp5_url,
'data_set' => @data_set,
'user' => @user,
'password' => @password,
'chunk_size' => @chunk_size,
'output_path' => @output_path,
'tool_dir' => @tool_dir
}
if task['chunk_size'] == 0
task['chunk_size'] = CHUNK_SIZE
end
@logger.info("Chunk size set in #{task['chunk_size']/MEGA}MB")
@data_set_directory = @output_path.end_with?("/") ? @output_path : @output_path + "/"
task['data_set_directory'] = @data_set_directory
data_stream_list = @wendelin.getDataStreams(@data_set)
if data_stream_list["status_code"] == 0
if data_stream_list["result"].empty?
@logger.error("No valid data found for data set " + @data_set, print=TRUE)
@logger.info("Please use a valid dataset reference from the list of datasets available in the site.", print=TRUE)
@logger.abortExecution()
end
task['data_streams'] = data_stream_list["result"]
else
@logger.error(data_stream_list["error_message"], print=TRUE)
@logger.abortExecution()
end
task_report_file = getTaskReportFilename(@data_set_directory)
completed_file = getCompletedFilename(@data_set_directory)
if File.file?(task_report_file)
if File.file?(completed_file)
puts
@logger.info("This dataset was already downloaded. What do you want to do?", print=TRUE)
puts
self.askUserForAction(task, completed_file, task_report_file, action=UPDATE)
else
puts
@logger.info("There was a previous attempt to download this dataset but it did not finish successfully.", print=TRUE)
@logger.info("What do you want to do?", print=TRUE)
puts
self.askUserForAction(task, completed_file, task_report_file, action=RESUME)
end
else
dir_entries = Dir.entries(@data_set_directory).length
if File.file?(@data_set_directory+"/.ebulk_dataset")
dir_entries -= 1
end
if dir_entries > 2
puts
@logger.info("Dataset download directory is not empty! It will be overwritten: " + @data_set_directory, print=TRUE)
@logger.info("Continue with download? (y/n)", print=TRUE)
option = gets
option = option.chomp
if option == "n"
@logger.info("Download cancelled by user.", print=TRUE)
@logger.abortExecution()
end
end
ebulk_file = @data_set_directory + "/.ebulk_dataset"
ebulk_file_content = ""
if File.file?(ebulk_file)
ebulk_file_content = File.read(ebulk_file)
end
FileUtils.rm_rf(@data_set_directory)
unless File.directory?(@data_set_directory)
FileUtils.mkdir_p(@data_set_directory)
end
if ebulk_file_content != ""
File.open(ebulk_file, 'w') { |file| file.write(ebulk_file_content) }
end
File.open(task_report_file, 'w') {}
end
columns = [
Column.new(0, "reference", :string),
Column.new(1, "data_chunk", :string),
Column.new(2, "data_set", :string)
]
resume(task, columns, task['data_streams'].length, &control)
rescue Exception => e
@logger.error("An error occurred during operation: " + e.to_s, print=TRUE)
@logger.error(e.backtrace)
puts "[INFO] For more detailed information, please refer to the log file."
@logger.abortExecution()
end
end
def self.resume(task, columns, count, &control)
@logger = LogManager.instance()
task_reports = yield(task, columns, count)
if task_reports.any?
@logger.info("Reports:", print=TRUE)
if task_reports.length > 15
@logger.info(task_reports[0, 5], print=TRUE)
@logger.info(".....", print=TRUE)
@logger.info(task_reports[task_reports.length-5, task_reports.length-1], print=TRUE)
else
@logger.info(task_reports, print=TRUE)
end
@logger.info("Full task report:")
@logger.info(task_reports)
end
next_config_diff = task_reports.map{|hash| hash[RUN_DONE]}.flatten.compact
if(next_config_diff.length == count)
@logger.info("Dataset successfully downloaded.", print=TRUE)
@logger.info("#{count} files downloaded.", print=TRUE)
@logger.info("The files were saved in dataset directory: " + @data_set_directory, print=TRUE)
completed_file = getCompletedFilename(@data_set_directory)
File.open(completed_file, 'w') {}
if count > 10
next_config_diff = {}
end
end
return {RUN_DONE => next_config_diff}
end
def initialize(task, schema, index, page_builder)
super
@data_set = task['data_set']
@chunk_size = task['chunk_size']
@data_set_directory = task['data_set_directory']
@wendelin = WendelinClient.new(task['erp5_url'], task['user'], task['password'])
@logger = LogManager.instance()
end
def run
data_stream = task['data_streams'][@index]
id = data_stream[0]
ref = data_stream[1]
@logger.info("Getting content from remote #{ref}", print=TRUE)
begin
@wendelin.eachDataStreamContentChunk(id, @chunk_size) do |chunk|
if chunk.nil? || chunk.empty?
content = ""
else
content = Base64.encode64(chunk)
end
entry = [ref, content, @data_set]
page_builder.add(entry)
page_builder.finish
end
rescue java.lang.OutOfMemoryError
@logger.logOutOfMemoryError(ref)
return_value = RUN_ABORTED
rescue Exception => e
@logger.error("An error occurred during processing: " + e.to_s, print=TRUE)
@logger.error(e.backtrace)
puts "[INFO] For more detailed information, please refer to the log file: " + @logger.getLogPath()
return_value = RUN_ERROR
else
return_value = RUN_DONE
end
task_report_file = Wendelininput.getTaskReportFilename(@data_set_directory)
File.open(task_report_file, 'ab') { |file| file.puts(ref+";"+return_value+";") }
return {return_value => ref}
end
end
end
end
require 'base64'
require 'fileutils'
require_relative '../filelogger'
module Embulk
module Output
class Fifoutput < OutputPlugin
Plugin.register_output("fif", self)
def self.transaction(config, schema, count, &control)
@logger = LogManager.instance()
task = { "output_path" => config.param("output_path", :string, :default => nil) }
if File.directory?(task['output_path'])
else
@logger.error("Output directory not found.", print=TRUE)
@logger.abortExecution()
end
task_reports = yield(task)
next_config_diff = {}
return next_config_diff
end
def init
@output_path = task["output_path"]
@logger = LogManager.instance()
end
def close
end
def add(page)
begin
page.each do |record|
reference = record[0]
data_chunk = Base64.decode64(record[1])
data_set_directory = @output_path.end_with?("/") ? @output_path : @output_path + "/"
ref = reference.reverse.sub("/".reverse, ".".reverse).reverse.sub(record[2]+"/", "")
if ref.end_with?(".none")
ref = ref[0...-5]
end
dirname = File.dirname(data_set_directory + ref)
unless File.directory?(dirname)
FileUtils.mkdir_p(dirname)
end
File.open(data_set_directory + ref, 'ab') { |file| file.write(data_chunk) }
end
rescue Exception => e
@logger.error("An error occurred while writing file.", print=TRUE)
@logger.error(e.backtrace)
raise e
end
end
def finish
end
def abort
end
def commit
task_report = {}
return task_report
end
end
end
end
require 'base64'
require_relative '../wendelin_client'
module Embulk
module Output
class Wendelin < OutputPlugin
Plugin.register_output("wendelin", self)
def self.transaction(config, schema, count, &control)
task = {
"erp5_url" => config.param("erp5_url", :string),
"user" => config.param("user", :string, defualt: nil),
"password" => config.param("password", :string, default: nil),
"path_prefix" => config.param("path_prefix", :string, :default => nil),
}
task_reports = yield(task)
next_config_diff = {}
@logger = LogManager.instance()
@logger.info("Your ingested files will be available in the site in a few minutes. Thank for your patience.", print=TRUE)
return next_config_diff
end
def init
credentials = {}
@erp5_url = task["erp5_url"]
@user = task["user"]
@password = task["password"]
@logger = LogManager.instance()
@wendelin = WendelinClient.new(@erp5_url, @user, @password)
end
def close
end
def add(page)
page.each do |record|
supplier = (record[0].nil? || record[0].empty?) ? "default" : record[0]
dataset = (record[1].nil? || record[1].empty?) ? "default" : record[1]
filename = record[2]
extension = record[3]
eof = record[5]
data_chunk = record[4]
reference = [supplier, dataset, filename, extension, eof].join("/")
begin
if not @wendelin.ingest(reference, data_chunk)
raise "could not ingest"
end
rescue Exception => e
raise e
@logger.error(e.backtrace)
end
end
end
def finish
end
def abort
end
def commit
task_report = {}
return task_report
end
end
end
end
require_relative '../filelogger'
class Index
include Singleton
def initialize()
@index = 0
end
def increase()
@index = @index + 1
end
def get()
return @index
end
end
module Embulk
module Parser
class BinaryParserPlugin < ParserPlugin
Plugin.register_parser("binary", self)
CHUNK_SIZE = 50
MEGA = 1000000
EOF = "EOF"
def self.transaction(config, &control)
tool_dir = config.param('tool_dir', :string, default: ".")
@logger = LogManager.instance()
@logger.setFilename(tool_dir, "parser")
task = {
chunk_size: config.param('chunk_size', :float, default: CHUNK_SIZE) * MEGA,
supplier: config.param("supplier", :string, default: "parser"),
data_set: config.param("data_set", :string),
input_plugin: config.param("storage", :string, default: "parser"),
date: Time.now.strftime("%Y-%m-%d_%H-%M-%S")
}
columns = [
Column.new(0, "supplier", :string),
Column.new(1, "data_set", :string),
Column.new(2, "file", :string),
Column.new(3, "extension", :string),
Column.new(4, "data_chunk", :string),
Column.new(5, "eof", :string)
]
yield(task, columns)
end
def run(file_input)
@index = Index.instance().get()
@logger = LogManager.instance()
while file = file_input.next_file
begin
filename = "file_from_#{task['input_plugin']}_#{task['date']}"
each_chunk(file, filename, task['chunk_size']) do |record|
@page_builder.add(record)
end
@page_builder.finish
Index.instance().increase()
rescue java.lang.OutOfMemoryError
@logger.logOutOfMemoryError(path)
return
rescue Exception => e
@logger.error("An error occurred during file ingestion: " + e.to_s, print=TRUE)
@logger.error(e.backtrace)
puts "[INFO] For more detailed information, please refer to the log file: " + @logger.getLogPath()
end
end
end
private
def each_chunk(file, filename, chunk_size=CHUNK_SIZE)
extension = @index.to_s.rjust(3, "0")
npart = 0
next_byte = file.read(1)
first = TRUE
while true
data = next_byte
if not next_byte
if first
values = [task['supplier'], task['data_set'], filename, extension, "", EOF]
yield(values)
end
break
end
first = FALSE
data += file.read(chunk_size)
next_byte = file.read(1)
if not next_byte
eof = EOF
else
npart += 1
eof = npart.to_s.rjust(3, "0")
end
content = Base64.encode64(data)
values = [task['supplier'], task['data_set'], filename, extension, content, eof]
yield(values)
end
end
end
end
end
require 'net/http'
require 'openssl'
require 'yaml'
require 'open-uri'
require_relative 'filelogger'
# class representing a Wendelin client
class WendelinClient
def initialize(erp5_url, user, password)
@erp5_url = erp5_url
@user = user
@password = password
@banned_references_list = []
@logger = LogManager.instance()
@last_ingestion = Time.new - 2
end
def removeEOF(reference)
root = reference.dup
return root[0...root.rindex('/')]
end
def exists(reference)
uri = URI("#{@erp5_url}/ingestionReferenceExists?reference=#{reference}")
begin
res = open(uri, http_basic_authentication: [@user, @password]).read
rescue Exception => e
@logger.error("An error occurred while checking if reference exists: " + e.to_s)
@logger.error(e.backtrace)
return FALSE
else
return res.to_s == 'TRUE'
end
end
def ingest(reference, data_chunk)
@logger.info("Ingestion reference: #{reference}", print=TRUE)
if @banned_references_list.include? removeEOF(reference)
return FALSE
end
if Time.new - @last_ingestion < 2
# avoid send ingestions to close (specially for split ones)
sleep 3
end
if exists(reference)
@logger.info("There is another ingestion already done for the pair data_set-filename. Reference "\
+ removeEOF(reference), print=TRUE)
@logger.info("Rename your reference or delete the older ingestion.", print=TRUE)
@banned_references_list << removeEOF(reference)
return FALSE
end
if reference.include? "#" or reference.include? "+"
raise "Invalid chars in file name. Please rename it."
end
begin
uri = URI("#{@erp5_url}/ingest?reference=#{reference}")
rescue Exception => e
@logger.error("An error occurred while generating url: " + e.to_s)
@logger.error(e.backtrace)
raise "Invalid chars in file name. Please rename it."
end
response = handleRequest(uri, reference, data_chunk)
if response == FALSE
return FALSE
end
@logger.info("Record successfully ingested.", print=TRUE)
@last_ingestion = Time.new
return TRUE
end
def eachDataStreamContentChunk(id, chunk_size)
uri = URI("#{@erp5_url}#{id}/getData")
@logger.info("Downloading...", print=TRUE)
first = TRUE
res = open(uri, http_basic_authentication: [@user, @password]) {
|content|
while true
chunk = content.read(chunk_size)
if chunk.nil?
if first
yield chunk
end
@logger.info("Done", print=TRUE)
break
end
first = FALSE
yield chunk
end
}
end
def getDataStreams(data_set_reference)
@logger.info("Getting file list for dataset '#{data_set_reference}'", print=TRUE)
uri = URI("#{@erp5_url}getDataStreamList?data_set_reference=#{data_set_reference}")
str = handleRequest(uri)
if str == FALSE
@logger.abortExecution()
end
if not str.nil?
str.gsub!(/(\,)(\S)/, "\\1 \\2")
return YAML::load(str)
end
return {'status_code': 0, 'result': []}
end
private
def handleRequest(uri, reference=nil, data_chunk=nil)
req = Net::HTTP::Post.new(uri)
req.basic_auth @user, @password
if data_chunk != nil
@logger.info("Setting request form data...", print=TRUE)
begin
req.set_form_data('data_chunk' => data_chunk)
rescue java.lang.OutOfMemoryError
@logger.logOutOfMemoryError(reference)
@banned_references_list << removeEOF(reference)
return FALSE
end
@logger.info("Sending record:'#{reference}'...", print=TRUE)
end
begin
res = Net::HTTP.start(uri.hostname, uri.port,
:use_ssl => (uri.scheme == 'https'),
:verify_mode => OpenSSL::SSL::VERIFY_NONE,
:ssl_timeout => 32000, :open_timeout => 32000, :read_timeout => 32000,
) do |http|
http.request(req)
end
rescue Exception => e
@logger.error("HTTP ERROR: " + e.to_s, print=TRUE)
@logger.error(e.backtrace)
return FALSE
else
if res.kind_of?(Net::HTTPSuccess) # res.code is 2XX
@logger.info("Done", print=TRUE)
return res.body
else
@logger.error("HTTP FAIL - code: #{res.code}", print=TRUE)
if res.code == '500' or res.code == '502' or res.code == '503'
@logger.error("Internal Server Error: if the error persists, please contact the administrator.", print=TRUE)
elsif res.code == '401'
@logger.error("Unauthorized access. Please check your user credentials and try again.", print=TRUE)
@logger.abortExecution()
else
@logger.error("Sorry, an error ocurred. If the error persists, please contact the administrator.", print=TRUE)
#@logger.error(res.value)
end
return FALSE
end
end
end
end
ebulk ingest-download tool help
usage: ebulk <command> <dataset> [options...]
commands:
pull <dataset> Downloads the content of the target dataset from the site into the output folder
push <dataset> Ingests the content of the input folder into a target dataset on the site
-h, --help Tool help
-r, --readme Opens README file
argument:
dataset Mandatory. Unique reference for the target dataset
It must start with a letter, and only alphanumerics, dots ( . ), underscores ( _ ) and hyphens ( - ) are allowed
* For download, the reference must be one of the available datasets on the site
* For ingestion, an existing reference will append the files to the corresponding dataset
* A new reference will create a new dataset on the site
It could be a path, then the last directory will be interpreted as the reference
e.g. pull my_directory/sample/ --> dataset reference will be "sample"
options:
-d, --directory <path> Besides the dataset reference, sets the dataset directory and it links that location to the reference
-c, --chunk <chunk> Sets the chunk size (in megabytes) to split large files
-s, --storage <storage> Uses the selected input storage from this set: [http, ftp, s3]
-cs, --custom-storage Allows user to set a new input storage.
-a, --advanced Allows to edit the Embulk cofiguration file of the input storage
examples:
ebulk pull <DATASET>
* downloads the content of target dataset
ebulk push <DATASET>
* ingests files into the target dataset
ebulk pull <DATASET> -d <PATH>
* downloads the content of target dataset in target PATH
* future operations on PATH directory will use the DATASET reference implicitly
ebulk push <DATASET> -c 20
* ingests files into the <DATASET> splitting them in chunks of 20MB
ebulk push <DATASET> -s <STORAGE>
* ingests the content of the input storage [http, ftp, s3] into the target dataset
ebulk push <DATASET> -s <STORAGE> --advanced
* allows the user to edit the configuration file of the selected storage
ebulk push <DATASET> --custom-storage
* user can install and configure a new input plugin storage
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment