So I hit the same issue and after some investigation and time I figured the following workaround that works.
So the problem is due to internal implementation of sequence file creation and the fact that it is using the file length which is updated per block of 64 MBs.
So I created the following class to create the reader and I wrapped the hadoop FS with my own while I overriding the get length method to return the file length instead:
public class SequenceFileUtil {
public SequenceFile.Reader createReader(Configuration conf, Path path) throws IOException {
WrappedFileSystem fileSystem = new WrappedFileSystem(FileSystem.get(conf));
return new SequenceFile.Reader(fileSystem, path, conf);
}
private class WrappedFileSystem extends FileSystem
{
private final FileSystem nestedFs;
public WrappedFileSystem(FileSystem fs){
this.nestedFs = fs;
}
@Override
public URI getUri() {
return nestedFs.getUri();
}
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
return nestedFs.open(f,bufferSize);
}
@Override
public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
return nestedFs.create(f, permission,overwrite,bufferSize, replication, blockSize, progress);
}
@Override
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
return nestedFs.append(f, bufferSize, progress);
}
@Override
public boolean rename(Path src, Path dst) throws IOException {
return nestedFs.rename(src, dst);
}
@Override
public boolean delete(Path path) throws IOException {
return nestedFs.delete(path);
}
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
return nestedFs.delete(f, recursive);
}
@Override
public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
return nestedFs.listStatus(f);
}
@Override
public void setWorkingDirectory(Path new_dir) {
nestedFs.setWorkingDirectory(new_dir);
}
@Override
public Path getWorkingDirectory() {
return nestedFs.getWorkingDirectory();
}
@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
return nestedFs.mkdirs(f, permission);
}
@Override
public FileStatus getFileStatus(Path f) throws IOException {
return nestedFs.getFileStatus(f);
}
@Override
public long getLength(Path f) throws IOException {
DFSClient.DFSInputStream open = new DFSClient(nestedFs.getConf()).open(f.toUri().getPath());
long fileLength = open.getFileLength();
long length = nestedFs.getLength(f);
if (length < fileLength){
//We might have uncompleted blocks
return fileLength;
}
return length;
}
}
}