Commit 407b5b1b authored by Klaus Wölfel's avatar Klaus Wölfel

Add file_size option to only process files with given file_size in bytes

parent 3b126b2d
package org.embulk.input.filename; package org.embulk.input.filename;
import java.util.List; import java.util.List;
import java.util.ArrayList;
import java.util.Collections;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
...@@ -13,7 +15,13 @@ import java.nio.file.SimpleFileVisitor; ...@@ -13,7 +15,13 @@ import java.nio.file.SimpleFileVisitor;
import java.nio.file.FileVisitResult; import java.nio.file.FileVisitResult;
import java.nio.file.attribute.BasicFileAttributes; import java.nio.file.attribute.BasicFileAttributes;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.base.Optional;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.ConfigInject;
import org.embulk.config.ConfigSource;
import org.embulk.config.ConfigDiff;
import org.embulk.config.TaskReport; import org.embulk.config.TaskReport;
import org.embulk.config.Task; import org.embulk.config.Task;
import org.embulk.config.TaskSource; import org.embulk.config.TaskSource;
...@@ -84,16 +92,85 @@ class FilenameFileInputStream extends FileInputStream { ...@@ -84,16 +92,85 @@ class FilenameFileInputStream extends FileInputStream {
} }
} }
public class FilenameFileInputPlugin public class FilenameFileInputPlugin implements FileInputPlugin
extends LocalFileInputPlugin
implements FileInputPlugin
{ {
public interface PluginTask extends Task
{
@Config("path_prefix")
String getPathPrefix();
@Config("last_path")
@ConfigDefault("null")
Optional<String> getLastPath();
@Config("file_size")
@ConfigDefault("null")
Optional<Integer> getFileSize();
@Config("follow_symlinks")
@ConfigDefault("false")
boolean getFollowSymlinks();
List<String> getFiles();
void setFiles(List<String> files);
@ConfigInject
BufferAllocator getBufferAllocator();
}
private final Logger log = Exec.getLogger(getClass()); private final Logger log = Exec.getLogger(getClass());
private final static Path CURRENT_DIR = Paths.get(".").normalize(); private final static Path CURRENT_DIR = Paths.get(".").normalize();
@Override @Override
public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control control)
{
PluginTask task = config.loadConfig(PluginTask.class);
// list files recursively
List<String> files = listFiles(task);
log.info("Loading files {}", files);
task.setFiles(files);
// number of processors is same with number of files
int taskCount = task.getFiles().size();
return resume(task.dump(), taskCount, control);
}
@Override
public ConfigDiff resume(TaskSource taskSource,
int taskCount,
FileInputPlugin.Control control)
{
PluginTask task = taskSource.loadTask(PluginTask.class);
control.run(taskSource, taskCount);
// build next config
ConfigDiff configDiff = Exec.newConfigDiff();
// last_path
if (task.getFiles().isEmpty()) {
// keep the last value
if (task.getLastPath().isPresent()) {
configDiff.set("last_path", task.getLastPath().get());
}
} else {
List<String> files = new ArrayList<String>(task.getFiles());
Collections.sort(files);
configDiff.set("last_path", files.get(files.size() - 1));
}
return configDiff;
}
@Override
public void cleanup(TaskSource taskSource,
int taskCount,
List<TaskReport> successTaskReports)
{ }
public List<String> listFiles(PluginTask task) public List<String> listFiles(PluginTask task)
{ {
Path pathPrefix = Paths.get(task.getPathPrefix()).normalize(); Path pathPrefix = Paths.get(task.getPathPrefix()).normalize();
...@@ -110,6 +187,7 @@ public class FilenameFileInputPlugin ...@@ -110,6 +187,7 @@ public class FilenameFileInputPlugin
final ImmutableList.Builder<String> builder = ImmutableList.builder(); final ImmutableList.Builder<String> builder = ImmutableList.builder();
final String lastPath = task.getLastPath().orNull(); final String lastPath = task.getLastPath().orNull();
final Integer fileSize = task.getFileSize().orNull();
try { try {
log.info("Listing local files at directory '{}' filtering filename by prefix '{}'", directory.equals(CURRENT_DIR) ? "." : directory.toString(), fileNamePrefix); log.info("Listing local files at directory '{}' filtering filename by prefix '{}'", directory.equals(CURRENT_DIR) ? "." : directory.toString(), fileNamePrefix);
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() { Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
...@@ -141,7 +219,9 @@ public class FilenameFileInputPlugin ...@@ -141,7 +219,9 @@ public class FilenameFileInputPlugin
return FileVisitResult.CONTINUE; return FileVisitResult.CONTINUE;
} else { } else {
if (path.getFileName().toString().startsWith(fileNamePrefix)) { if (path.getFileName().toString().startsWith(fileNamePrefix)) {
builder.add(path.toString()); if (fileSize == null || path.toFile().length() == fileSize) {
builder.add(path.toString());
}
} }
return FileVisitResult.CONTINUE; return FileVisitResult.CONTINUE;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment