001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.sequencefile;
031
032import java.io.File;
033import java.io.IOException;
034import java.io.InputStream;
035import java.io.PrintStream;
036import java.io.PrintWriter;
037import java.lang.reflect.ParameterizedType;
038import java.math.BigInteger;
039import java.net.URI;
040import java.security.MessageDigest;
041import java.security.NoSuchAlgorithmException;
042import java.util.ArrayList;
043import java.util.HashMap;
044import java.util.Iterator;
045import java.util.LinkedHashMap;
046import java.util.List;
047import java.util.Map;
048import java.util.Map.Entry;
049import java.util.NoSuchElementException;
050import java.util.UUID;
051import java.util.zip.Deflater;
052import java.util.zip.ZipEntry;
053import java.util.zip.ZipOutputStream;
054
055import org.apache.hadoop.conf.Configuration;
056import org.apache.hadoop.fs.FSDataOutputStream;
057import org.apache.hadoop.fs.FileStatus;
058import org.apache.hadoop.fs.FileSystem;
059import org.apache.hadoop.fs.LocalFileSystem;
060import org.apache.hadoop.fs.Path;
061import org.apache.hadoop.fs.PathFilter;
062import org.apache.hadoop.io.SequenceFile;
063import org.apache.hadoop.io.SequenceFile.CompressionType;
064import org.apache.hadoop.io.SequenceFile.Metadata;
065import org.apache.hadoop.io.SequenceFile.Reader;
066import org.apache.hadoop.io.SequenceFile.Writer;
067import org.apache.hadoop.io.Text;
068import org.apache.hadoop.io.Writable;
069import org.apache.hadoop.io.compress.CompressionCodec;
070import org.apache.hadoop.io.compress.DefaultCodec;
071import org.apache.hadoop.util.ReflectionUtils;
072
073/**
074 * Base class for a utility class that deals with specifically typed sequence
075 * files.
076 * 
077 * @author Jonathon Hare (jsh2@ecs.soton.ac.uk)
078 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
079 * 
080 * @param <K>
081 *            Key type
082 * @param <V>
083 *            Value type
084 */
085public abstract class SequenceFileUtility<K extends Writable, V extends Writable> implements Iterable<Entry<K, V>> {
086        protected Configuration config = new Configuration();
087        protected FileSystem fileSystem;
088        protected Path sequenceFilePath;
089
090        protected Writer writer;
091        protected CompressionType compressionType = CompressionType.BLOCK;
092
093        protected boolean isReader;
094
095        protected String uuid;
096
097        public SequenceFileUtility(String uriOrPath, boolean read) throws IOException {
098                setup(convertToURI(uriOrPath), read);
099        }
100
101        public SequenceFileUtility(URI uri, boolean read) throws IOException {
102                setup(uri, read);
103        }
104
105        public SequenceFileUtility(String uriOrPath, CompressionType compressionType) throws IOException {
106                this.compressionType = compressionType;
107                setup(convertToURI(uriOrPath), false);
108        }
109
110        public SequenceFileUtility(URI uri, CompressionType compressionType) throws IOException {
111                this.compressionType = compressionType;
112                setup(uri, false);
113        }
114
115        /**
116         * Get a list of all the reducer outputs in a directory. If the given
117         * path/uri is not a directory, then it is assumed that it is a SequenceFile
118         * and returned directly.
119         * 
120         * @param uriOrPath
121         *            the path or uri
122         * @return the reducer outputs
123         * @throws IOException
124         */
125        public static URI[] getReducerFiles(String uriOrPath) throws IOException {
126                return getFiles(uriOrPath, "part-r-");
127        }
128
129        /**
130         * Get a list of all the sequence files (with a given name prefix) in a
131         * directory. If the given uri is not a directory, then it is assumed that
132         * it is a SequenceFile and returned directly.
133         * 
134         * @param uriOrPath
135         *            the path or uri
136         * @param filenamePrefix
137         *            the prefix of the file name
138         * @return the matching files
139         * @throws IOException
140         */
141        public static URI[] getFiles(String uriOrPath, final String filenamePrefix) throws IOException {
142                final Configuration config = new Configuration();
143                final URI uri = convertToURI(uriOrPath);
144                final FileSystem fs = FileSystem.get(uri, config);
145                final Path path = new Path(uri.toString());
146
147                if (fs.getFileStatus(path).isDir()) {
148                        final FileStatus[] files = fs.listStatus(path, new PathFilter() {
149                                @Override
150                                public boolean accept(Path p) {
151                                        return p.getName().startsWith(filenamePrefix);
152                                }
153                        });
154
155                        final URI[] uris = new URI[files.length];
156                        int i = 0;
157                        for (final FileStatus status : files) {
158                                uris[i++] = status.getPath().toUri();
159                        }
160                        return uris;
161                } else {
162                        return new URI[] { uri };
163                }
164        }
165
166        /**
167         * Get a list of all the sequence files (with a given name prefix) in the
168         * set of input paths. If a given uri is not a directory, then it is assumed
169         * that it is a SequenceFile and returned directly.
170         * 
171         * @param uriOrPaths
172         *            the paths or uris
173         * @param filenamePrefix
174         *            the prefix of the file name
175         * @return the list of sequence files
176         * @throws IOException
177         */
178        public static Path[] getFilePaths(String[] uriOrPaths, String filenamePrefix) throws IOException {
179                final List<Path> pathList = new ArrayList<Path>();
180                for (final String uriOrPath : uriOrPaths) {
181                        final Path[] paths = getFilePaths(uriOrPath, filenamePrefix);
182                        for (final Path path : paths) {
183                                pathList.add(path);
184                        }
185                }
186                return pathList.toArray(new Path[pathList.size()]);
187        }
188
189        /**
190         * Get a list of all the sequence files (with a given name prefix) in the
191         * set of input paths.
192         * <p>
193         * Optionally a subdirectory can be provided; if provided the subdirectory
194         * is appended to each path (i.e. PATH/subdirectory).
195         * <p>
196         * If the given uri is not a directory, then it is assumed that it is a
197         * single SequenceFile and returned directly.
198         * 
199         * @param uriOrPaths
200         *            the URI or path to the directory/file
201         * @param subdir
202         *            the optional subdirectory (may be null)
203         * @param filenamePrefix
204         *            the prefix of the file name
205         * @return the list of sequence files
206         * @throws IOException
207         */
208        public static Path[] getFilePaths(String[] uriOrPaths, String subdir, String filenamePrefix) throws IOException {
209                final List<Path> pathList = new ArrayList<Path>();
210
211                for (String uriOrPath : uriOrPaths) {
212                        if (subdir != null)
213                                uriOrPath += "/" + subdir;
214
215                        final Path[] paths = getFilePaths(uriOrPath, filenamePrefix);
216                        for (final Path path : paths) {
217                                pathList.add(path);
218                        }
219                }
220                return pathList.toArray(new Path[pathList.size()]);
221        }
222
223        /**
224         * Get a list of all the sequence files (with a given name prefix) in a
225         * directory. If the given uri is not a directory, then it is assumed that
226         * it is a SequenceFile and returned directly.
227         * 
228         * @param uriOrPath
229         *            the path or uri
230         * @param filenamePrefix
231         *            the prefix of the file name
232         * @return the list of sequence files
233         * @throws IOException
234         */
235        public static Path[] getFilePaths(String uriOrPath, final String filenamePrefix) throws IOException {
236                final Configuration config = new Configuration();
237                final URI uri = convertToURI(uriOrPath);
238                final FileSystem fs = FileSystem.get(uri, config);
239
240                final Path path = new Path(uri);
241
242                if (fs.getFileStatus(path).isDir()) {
243                        final FileStatus[] files = fs.listStatus(path, new PathFilter() {
244                                @Override
245                                public boolean accept(Path p) {
246                                        return p.getName().startsWith(filenamePrefix);
247                                }
248                        });
249
250                        final Path[] uris = new Path[files.length];
251                        int i = 0;
252                        for (final FileStatus status : files) {
253                                uris[i++] = status.getPath();
254                        }
255                        return uris;
256                } else {
257                        return new Path[] { path };
258                }
259        }
260
261        /**
262         * Get a list of all the sequence files whose names match the given regular
263         * expression in a directory. If the given uri is not a directory, then it
264         * is assumed that it is a SequenceFile and returned directly.
265         * 
266         * @param uriOrPath
267         *            the path or uri
268         * @param regex
269         *            the regular expression to match
270         * @return a list of files
271         * @throws IOException
272         */
273        public static URI[] getFilesRegex(String uriOrPath, final String regex) throws IOException {
274                final Configuration config = new Configuration();
275                final URI uri = convertToURI(uriOrPath);
276                final FileSystem fs = FileSystem.get(uri, config);
277                final Path path = new Path(uri.toString());
278
279                if (fs.getFileStatus(path).isDir()) {
280                        final FileStatus[] files = fs.listStatus(path, new PathFilter() {
281                                @Override
282                                public boolean accept(Path p) {
283                                        return (regex == null || p.getName().matches(regex));
284                                }
285                        });
286
287                        final URI[] uris = new URI[files.length];
288                        int i = 0;
289                        for (final FileStatus status : files) {
290                                uris[i++] = status.getPath().toUri();
291                        }
292                        return uris;
293                } else {
294                        return new URI[] { uri };
295                }
296        }
297
298        /**
299         * Return a list of the keys in the sequence file. Read mode only.
300         * 
301         * @return keys.
302         */
303        @SuppressWarnings("unchecked")
304        public Map<K, Long> listKeysAndOffsets() {
305                if (!isReader) {
306                        throw new UnsupportedOperationException("Cannot read keys in write mode");
307                }
308
309                Reader reader = null;
310                try {
311                        final Map<K, Long> keys = new LinkedHashMap<K, Long>();
312
313                        reader = createReader();
314                        final Class<K> keyClass = (Class<K>) reader.getKeyClass();
315                        K key = ReflectionUtils.newInstance(keyClass, config);
316                        final V val = ReflectionUtils.newInstance((Class<V>) reader.getValueClass(), config);
317                        long start = 0L;
318                        long end = 0L;
319                        while (reader.next(key, val)) {
320                                final long pos = reader.getPosition();
321                                if (pos != end) {
322                                        start = end;
323                                        end = pos;
324                                }
325                                keys.put(key, start);
326                                key = ReflectionUtils.newInstance(keyClass, config);
327                        }
328
329                        return keys;
330                } catch (final Exception e) {
331                        throw new RuntimeException(e);
332                } finally {
333                        if (reader != null)
334                                try {
335                                        reader.close();
336                                } catch (final IOException e1) {
337                                }
338                }
339        }
340
341        /**
342         * Go through a sequence file, applying each
343         * {@link RecordInformationExtractor} to each key, printing out the results
344         * in order to the provided {@link PrintStream}
345         * 
346         * @param extractors
347         *            the {@link RecordInformationExtractor}s to apply
348         * @param stream
349         *            the stream to write to
350         * @param delim
351         */
352        @SuppressWarnings("unchecked")
353        public void extract(List<RecordInformationExtractor> extractors, PrintStream stream, String delim) {
354                if (!isReader) {
355                        throw new UnsupportedOperationException("Cannot read keys in write mode");
356                }
357
358                Reader reader = null;
359                try {
360                        reader = createReader();
361
362                        final Class<K> keyClass = (Class<K>) reader.getKeyClass();
363                        K key = ReflectionUtils.newInstance(keyClass, config);
364                        final V val = ReflectionUtils.newInstance((Class<V>) reader.getValueClass(), config);
365                        long start = 0L;
366                        long end = 0L;
367                        int count = 0;
368                        while (reader.next(key, val)) {
369                                final long pos = reader.getPosition();
370                                if (pos != end) {
371                                        start = end;
372                                        end = pos;
373                                }
374
375                                // Apply the filters and print
376                                String recordString = "";
377                                for (final RecordInformationExtractor extractor : extractors) {
378                                        recordString += extractor.extract(key, val, start, sequenceFilePath) + delim;
379                                }
380
381                                if (recordString.length() >= delim.length())
382                                        recordString = recordString.substring(0, recordString.length() - delim.length());
383
384                                stream.println(recordString);
385                                count++;
386
387                                System.err.printf("\rOutputted: %10d", count);
388                                key = ReflectionUtils.newInstance(keyClass, config);
389                        }
390                        System.err.println();
391                } catch (final Exception e) {
392                        throw new RuntimeException(e);
393                } finally {
394                        if (reader != null)
395                                try {
396                                        reader.close();
397                                } catch (final IOException e1) {
398                                }
399                }
400        }
401
402        public SequenceFileUtility(String uriOrPath, CompressionType compressionType, Map<String, String> metadata)
403                        throws IOException
404        {
405                this.compressionType = compressionType;
406                setup(convertToURI(uriOrPath), false);
407        }
408
409        public SequenceFileUtility(URI uri, CompressionType compressionType, Map<String, String> metadata) throws IOException
410        {
411                this.compressionType = compressionType;
412                setup(uri, false);
413        }
414
415        /**
416         * Converts a string representing a file or uri to a uri object.
417         * 
418         * @param uriOrPath
419         *            uri or path to convert
420         * @return uri
421         */
422        public static URI convertToURI(String uriOrPath) {
423                if (uriOrPath.contains("://")) {
424                        return URI.create(uriOrPath);
425                } else {
426                        return new File(uriOrPath).toURI();
427                }
428        }
429
430        private void setup(URI uri, boolean read) throws IOException {
431                setup(uri, read, null);
432        }
433
434        private void setup(URI uri, boolean read, Map<String, String> metadata) throws IOException {
435                fileSystem = getFileSystem(uri);
436                sequenceFilePath = new Path(uri.toString());
437
438                this.isReader = read;
439
440                if (read) {
441                        Reader reader = null;
442
443                        try {
444                                reader = createReader();
445                                final Text uuidText = reader.getMetadata().get(new Text(MetadataConfiguration.UUID_KEY));
446                                if (uuidText != null)
447                                        uuid = uuidText.toString();
448
449                                if (!reader.isCompressed())
450                                        compressionType = CompressionType.NONE;
451                                else if (reader.isBlockCompressed())
452                                        compressionType = CompressionType.BLOCK;
453                                else
454                                        compressionType = CompressionType.RECORD;
455                        } catch (final Exception e) {
456                                throw new RuntimeException(e);
457                        } finally {
458                                if (reader != null)
459                                        try {
460                                                reader.close();
461                                        } catch (final IOException e1) {
462                                        }
463                        }
464                } else {
465                        if (metadata == null) {
466                                metadata = new HashMap<String, String>();
467                        }
468
469                        if (!metadata.containsKey(MetadataConfiguration.UUID_KEY)) {
470                                uuid = UUID.randomUUID().toString();
471                                metadata.put(MetadataConfiguration.UUID_KEY, uuid);
472                        }
473
474                        // if the output directory is a directory, then create the file
475                        // inside the
476                        // directory with the name given by the uuid
477                        if (fileSystem.exists(sequenceFilePath) && fileSystem.getFileStatus(sequenceFilePath).isDir()) {
478                                sequenceFilePath = new Path(sequenceFilePath, uuid + ".seq");
479                        }
480
481                        writer = createWriter(metadata);
482                }
483        }
484
485        @SuppressWarnings("unchecked")
486        private Writer createWriter(Map<String, String> metadata) throws IOException {
487                final Metadata md = new Metadata();
488
489                for (final Entry<String, String> e : metadata.entrySet()) {
490                        md.set(new Text(e.getKey()), new Text(e.getValue()));
491                }
492                final Class<K> keyClass = (Class<K>) ((ParameterizedType) getClass().getGenericSuperclass())
493                                .getActualTypeArguments()[0];
494                final Class<V> valueClass = (Class<V>) ((ParameterizedType) getClass().getGenericSuperclass())
495                                .getActualTypeArguments()[1];
496
497                return SequenceFile.createWriter(fileSystem, config, sequenceFilePath, keyClass, valueClass, compressionType,
498                                new DefaultCodec(), null,
499                                md);
500        }
501
502        private Reader createReader() throws IOException {
503                // if(this.fileSystem.getFileStatus(sequenceFilePath).isDir())
504                // sequenceFilePath = new Path(sequenceFilePath,"part-r-00000");
505                return new Reader(fileSystem, sequenceFilePath, config);
506        }
507
508        /**
509         * Get the UUID of this file
510         * 
511         * @return UUID
512         */
513        public String getUUID() {
514                return uuid;
515        }
516
517        /**
518         * Return the metadata map. Read mode only.
519         * 
520         * @return metadata
521         */
522        public Map<Text, Text> getMetadata() {
523                if (!isReader) {
524                        throw new UnsupportedOperationException("Cannot read metadata in write mode");
525                }
526
527                Reader reader = null;
528                try {
529                        reader = createReader();
530                        final Map<Text, Text> metadata = reader.getMetadata().getMetadata();
531                        return metadata;
532                } catch (final Exception e) {
533                        throw new RuntimeException(e);
534                } finally {
535                        if (reader != null)
536                                try {
537                                        reader.close();
538                                } catch (final IOException e1) {
539                                }
540                }
541        }
542
543        /**
544         * Return a list of the keys in the sequence file. Read mode only.
545         * 
546         * @return keys.
547         */
548        @SuppressWarnings("unchecked")
549        public List<K> listKeys() {
550                if (!isReader) {
551                        throw new UnsupportedOperationException("Cannot read keys in write mode");
552                }
553
554                Reader reader = null;
555                try {
556                        final List<K> keys = new ArrayList<K>();
557
558                        reader = createReader();
559                        final Class<K> keyClass = (Class<K>) reader.getKeyClass();
560                        K key = ReflectionUtils.newInstance(keyClass, config);
561
562                        while (reader.next(key)) {
563                                keys.add(key);
564                                key = ReflectionUtils.newInstance(keyClass, config);
565                        }
566
567                        return keys;
568                } catch (final Exception e) {
569                        throw new RuntimeException(e);
570                } finally {
571                        if (reader != null)
572                                try {
573                                        reader.close();
574                                } catch (final IOException e1) {
575                                }
576                }
577        }
578
579        /**
580         * Extracts file to a directory. Read mode only.
581         * 
582         * @param uriOrPath
583         *            path or uri to extract to.
584         * @throws IOException
585         */
586        public void exportData(String uriOrPath) throws IOException {
587                exportData(uriOrPath, NamingStrategy.KEY, new ExtractionState(), false, 0);
588        }
589
590        /**
591         * Extracts file to a directory. Read mode only.
592         * 
593         * @param uriOrPath
594         *            path or uri to extract to.
595         * @param naming
596         *            the naming strategy
597         * @param extrState
598         *            the extraction state
599         * @param addExtension
600         *            if true, then file extensions are added to each record
601         *            automatically
602         * @param offset
603         *            offset from which to start. Can be used to reduce number of
604         *            files extracted.
605         * @throws IOException
606         */
607        public void exportData(String uriOrPath, NamingStrategy naming, ExtractionState extrState, boolean addExtension,
608                        long offset)
609                        throws IOException
610        {
611                FileSystem fs = null;
612                Path p = null;
613
614                if (uriOrPath != null) {
615                        final URI uri = convertToURI(uriOrPath);
616
617                        fs = getFileSystem(uri);
618                        p = new Path(uri.toString());
619                }
620
621                exportData(fs, p, naming, extrState, addExtension, offset);
622        }
623
624        public static ZipOutputStream openZipOutputStream(String uriOrPath) throws IOException {
625                final URI uri = convertToURI(uriOrPath);
626
627                final FileSystem fs = getFileSystem(uri, new Configuration());
628                final Path path = new Path(uri.toString());
629
630                final ZipOutputStream zos = new ZipOutputStream(fs.create(path));
631                zos.setLevel(Deflater.BEST_COMPRESSION);
632                return zos;
633        }
634
635        /**
636         * Extracts file to a directory. Read mode only.
637         * 
638         * @param uriOrPath
639         *            path or uri to extract to.
640         * @param naming
641         *            the naming strategy
642         * @param state
643         *            the extraction state
644         * @param addExtension
645         *            if true, then file extensions are added to each record
646         *            automatically
647         * @param offset
648         *            offset from which to start. Can be used to reduce number of
649         *            files extracted.
650         * @throws IOException
651         */
652        public void exportDataToZip(String uriOrPath, NamingStrategy naming, ExtractionState state, boolean addExtension,
653                        long offset)
654                        throws IOException
655        {
656                if (uriOrPath != null) {
657
658                        ZipOutputStream zos = null;
659                        try {
660                                zos = openZipOutputStream(uriOrPath);
661                                exportDataToZip(zos, naming, state, addExtension, offset);
662                        } finally {
663                                if (zos != null)
664                                        try {
665                                                zos.close();
666                                        } catch (final IOException e) {
667                                        }
668                                ;
669                        }
670                }
671        }
672
673        /**
674         * Extracts file to a zip file. Read mode only.
675         * 
676         * @param zos
677         *            The {@link ZipOutputStream} to write to
678         * @param naming
679         *            The naming strategy
680         * @param extrState
681         *            The extration state
682         * @param addExtension
683         *            if true, then file extensions are added to each record
684         *            automatically
685         * @param offset
686         *            offset from which to start. Can be used to reduce number of
687         *            files extracted.
688         * @throws IOException
689         */
690        public void exportDataToZip(ZipOutputStream zos, NamingStrategy naming, ExtractionState extrState,
691                        boolean addExtension, long offset)
692                        throws IOException
693        {
694                if (!isReader) {
695                        throw new UnsupportedOperationException("Cannot read keys in write mode");
696                }
697
698                Reader reader = null;
699                try {
700                        reader = createReader();
701                        if (offset > 0)
702                                reader.seek(offset);
703
704                        @SuppressWarnings("unchecked")
705                        final K key = ReflectionUtils.newInstance((Class<K>) reader.getKeyClass(), config);
706                        @SuppressWarnings("unchecked")
707                        final V val = ReflectionUtils.newInstance((Class<V>) reader.getValueClass(), config);
708
709                        while (reader.next(key)) {
710
711                                if (extrState.allowNext()) {
712                                        reader.getCurrentValue(val);
713
714                                        String name = naming.getName(key, val, extrState, addExtension);
715
716                                        while (name.startsWith("/"))
717                                                name = name.substring(1);
718
719                                        final ZipEntry ze = new ZipEntry(name);
720                                        zos.putNextEntry(ze);
721                                        writeZipData(zos, val);
722                                        zos.closeEntry();
723
724                                        extrState.tick();
725                                } else {
726                                        extrState.tick();
727                                }
728                                if (extrState.isFinished())
729                                        break;
730                        }
731                } catch (final Exception e) {
732                        throw new RuntimeException(e);
733                } finally {
734                        if (reader != null)
735                                try {
736                                        reader.close();
737                                } catch (final IOException e1) {
738                                }
739                }
740        }
741
742        /**
743         * Extracts file to a directory. Read mode only.
744         * 
745         * @param fs
746         *            filesystem of output file
747         * @param dirPath
748         *            path to extract to
749         */
750        public void exportData(FileSystem fs, Path dirPath) {
751                exportData(fs, dirPath, NamingStrategy.KEY, new ExtractionState(), false, 0);
752        }
753
754        /**
755         * Extracts file to a directory. Read mode only.
756         * 
757         * @param fs
758         *            filesystem of output file
759         * @param dirPath
760         *            path to extract to
761         * @param naming
762         *            the naming strategy
763         * @param extrState
764         *            the extraction state
765         * @param addExtension
766         *            if true, then file extensions are added to each record
767         *            automatically
768         * @param offset
769         *            offset from which to start. Can be used to reduce number of
770         *            files extracted.
771         */
772        @SuppressWarnings("unchecked")
773        public void exportData(FileSystem fs, Path dirPath, NamingStrategy naming, ExtractionState extrState,
774                        boolean addExtension, long offset)
775        {
776                if (!isReader) {
777                        throw new UnsupportedOperationException("Cannot read keys in write mode");
778                }
779
780                Reader reader = null;
781                try {
782                        if (fs != null)
783                                fs.mkdirs(dirPath);
784
785                        reader = createReader();
786                        if (offset > 0)
787                                reader.seek(offset);
788
789                        final K key = ReflectionUtils.newInstance((Class<K>) reader.getKeyClass(), config);
790                        final V val = ReflectionUtils.newInstance((Class<V>) reader.getValueClass(), config);
791
792                        while (reader.next(key)) {
793
794                                if (extrState.allowNext()) {
795                                        reader.getCurrentValue(val);
796                                        if (dirPath != null) {
797                                                String name = naming.getName(key, val, extrState, addExtension);
798                                                if (name.startsWith("/"))
799                                                        name = "." + name;
800
801                                                final Path outFilePath = new Path(dirPath, name);
802                                                // System.out.println("NP: " + np);
803                                                System.out.println("Path: " + outFilePath);
804                                                writeFile(fs, outFilePath, val);
805                                                extrState.tick();
806                                        } else {
807                                                System.out.println(key.toString());
808                                                printFile(val);
809                                                extrState.tick();
810                                        }
811                                } else {
812                                        extrState.tick();
813                                }
814                                if (extrState.isFinished())
815                                        break;
816                        }
817                } catch (final Exception e) {
818                        throw new RuntimeException(e);
819                } finally {
820                        if (reader != null)
821                                try {
822                                        reader.close();
823                                } catch (final IOException e1) {
824                                }
825                }
826        }
827
828        @SuppressWarnings("unchecked")
829        public void exportData(NamingStrategy np, ExtractionState nps, long offset, KeyValueDump<K, V> dump) {
830                if (!isReader) {
831                        throw new UnsupportedOperationException("Cannot read keys in write mode");
832                }
833
834                Reader reader = null;
835                try {
836
837                        reader = createReader();
838                        if (offset > 0)
839                                reader.seek(offset);
840
841                        final K key = ReflectionUtils.newInstance((Class<K>) reader.getKeyClass(), config);
842                        final V val = ReflectionUtils.newInstance((Class<V>) reader.getValueClass(), config);
843
844                        while (reader.next(key)) {
845
846                                if (nps.allowNext()) {
847                                        reader.getCurrentValue(val);
848                                        dump.dumpValue(key, val);
849                                }
850                                nps.tick();
851                                if (nps.isFinished())
852                                        break;
853                        }
854                } catch (final Exception e) {
855                        throw new RuntimeException(e);
856                } finally {
857                        if (reader != null)
858                                try {
859                                        reader.close();
860                                } catch (final IOException e1) {
861                                }
862                }
863        }
864
865        /**
866         * Close the underlying writer. Does nothing in read mode.
867         * 
868         * @throws IOException
869         */
870        public void close() throws IOException {
871                if (writer != null)
872                        writer.close();
873        }
874
875        /**
876         * Get number of records in file. Read mode only.
877         * 
878         * @return number of records
879         */
880        public long getNumberRecords() {
881                if (!isReader) {
882                        throw new UnsupportedOperationException("Cannot read keys in write mode");
883                }
884
885                Reader reader = null;
886                try {
887                        reader = createReader();
888
889                        final Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), config);
890
891                        long count = 0;
892                        while (reader.next(key)) {
893                                count++;
894                        }
895                        return count;
896                } catch (final Exception e) {
897                        throw new RuntimeException(e);
898                } finally {
899                        if (reader != null)
900                                try {
901                                        reader.close();
902                                } catch (final IOException e1) {
903                                }
904                }
905        }
906
907        /**
908         * @return the compression codec in use for this file.
909         */
910        public Class<? extends CompressionCodec> getCompressionCodecClass() {
911                if (!isReader)
912                        return DefaultCodec.class;
913
914                Reader reader = null;
915                try {
916                        reader = createReader();
917                        if (reader.getCompressionCodec() == null)
918                                return null;
919                        return reader.getCompressionCodec().getClass();
920                } catch (final Exception e) {
921                        throw new RuntimeException(e);
922                } finally {
923                        if (reader != null)
924                                try {
925                                        reader.close();
926                                } catch (final IOException e1) {
927                                }
928                }
929        }
930
931        /**
932         * @return he compression mode used for this sequence file.
933         */
934        public CompressionType getCompressionType() {
935                return compressionType;
936        }
937
938        /**
939         * Get the filesystem associated with a uri.
940         * 
941         * @param uri
942         * @return the filesystem
943         * @throws IOException
944         */
945        public FileSystem getFileSystem(URI uri) throws IOException {
946                return getFileSystem(uri, config);
947        }
948
949        /**
950         * Get the filesystem associated with a uri.
951         * 
952         * @param uri
953         * @param config
954         * @return the filesystem
955         * @throws IOException
956         */
957        public static FileSystem getFileSystem(URI uri, Configuration config) throws IOException {
958                FileSystem fs = FileSystem.get(uri, config);
959                if (fs instanceof LocalFileSystem)
960                        fs = ((LocalFileSystem) fs).getRaw();
961                return fs;
962        }
963
964        /**
965         * Get a path from a uri.
966         * 
967         * @param uri
968         * @return the path
969         * @throws IOException
970         */
971        public Path getPath(URI uri) throws IOException {
972                return new Path(uri.toString());
973        }
974
975        /**
976         * Get the MD5 sum of a file
977         * 
978         * @param fs
979         * @param p
980         * @return the md5sum
981         */
982        public static String md5sum(FileSystem fs, Path p) {
983                MessageDigest digest;
984                try {
985                        digest = MessageDigest.getInstance("MD5");
986                } catch (final NoSuchAlgorithmException e1) {
987                        throw new RuntimeException(e1);
988                }
989
990                InputStream is = null;
991                try {
992                        final byte[] buffer = new byte[8192];
993                        int read = 0;
994
995                        is = fs.open(p);
996                        while ((read = is.read(buffer)) > 0) {
997                                digest.update(buffer, 0, read);
998                        }
999                        final byte[] md5sum = digest.digest();
1000
1001                        final BigInteger bigInt = new BigInteger(1, md5sum);
1002                        return bigInt.toString(16);
1003                } catch (final IOException e) {
1004                        throw new RuntimeException("Unable to process file for MD5", e);
1005                } finally {
1006                        try {
1007                                if (is != null)
1008                                        is.close();
1009                        } catch (final IOException e) {
1010                        }
1011                }
1012        }
1013
1014        protected abstract V readFile(FileSystem fs, Path path) throws IOException;
1015
1016        protected abstract void writeFile(FileSystem fs, Path path, V value) throws IOException;
1017
1018        protected abstract void writeZipData(ZipOutputStream zos, V value) throws IOException;
1019
1020        protected abstract void printFile(V value) throws IOException;
1021
1022        /**
1023         * Append data read from a file to the sequence file.
1024         * 
1025         * @param key
1026         * @param fs
1027         * @param p
1028         * @throws IOException
1029         */
1030        public void appendFile(K key, FileSystem fs, Path p) throws IOException {
1031                if (isReader) {
1032                        throw new UnsupportedOperationException("Cannot write data in read mode");
1033                }
1034
1035                writer.append(key, readFile(fs, p));
1036        }
1037
1038        /**
1039         * Append data to a sequence file.
1040         * 
1041         * @param key
1042         * @param value
1043         * @throws IOException
1044         */
1045        public void appendData(K key, V value) throws IOException {
1046                if (isReader) {
1047                        throw new UnsupportedOperationException("Cannot write data in read mode");
1048                }
1049                writer.append(key, value);
1050        }
1051
1052        /**
1053         * Interface for objects that can make a key from a path
1054         * 
1055         * @param <K>
1056         */
1057        public interface KeyProvider<K> {
1058                K getKey(FileSystem fs, Path path);
1059
1060                K getKey(FileSystem fs, Path path, Path base);
1061        }
1062
1063        /**
1064         * A class that provides Text keys by calculating a UUID from the MD5 of a
1065         * file
1066         */
1067        public static class MD5UUIDKeyProvider implements KeyProvider<Text> {
1068                @Override
1069                public Text getKey(FileSystem fs, Path path) {
1070                        final UUID uuid = UUID.nameUUIDFromBytes(SequenceFileUtility.md5sum(fs, path).getBytes());
1071                        return new Text(uuid.toString());
1072                }
1073
1074                @Override
1075                public Text getKey(FileSystem fs, Path path, Path base) {
1076                        return this.getKey(fs, path);
1077                }
1078        }
1079
1080        /**
1081         * A class that provides Text keys from the name of a file
1082         */
1083        public static class FilenameKeyProvider implements KeyProvider<Text> {
1084                @Override
1085                public Text getKey(FileSystem fs, Path path) {
1086                        return new Text(path.getName());
1087                }
1088
1089                @Override
1090                public Text getKey(FileSystem fs, Path path, Path base) {
1091                        return this.getKey(fs, path);
1092                }
1093        }
1094
1095        /**
1096         * A class that provides Text keys from the relative path + name of a file
1097         */
1098        public static class RelativePathFilenameKeyProvider implements KeyProvider<Text> {
1099                @Override
1100                public Text getKey(FileSystem fs, Path path) {
1101                        return new Text(path.toUri().getPath());
1102                }
1103
1104                @Override
1105                public Text getKey(FileSystem fs, Path path, Path base) {
1106                        return new Text(path.toUri().getPath().substring(base.toUri().getPath().length()));
1107                }
1108        }
1109
1110        /**
1111         * Append files to a sequenceFile.
1112         * 
1113         * @param fs
1114         *            The filesystem of the files being added.
1115         * @param path
1116         *            The path of the file(s) being added.
1117         * @param recurse
1118         *            If true, then subdirectories are also searched
1119         * @param pathFilter
1120         *            Filter for omitting files. Can be null.
1121         * @param keyProvider
1122         *            Object that can return a key for a given file.
1123         * @return Paths and their respective keys for files that were added.
1124         * @throws IOException
1125         */
1126        public Map<Path, K> appendFiles(FileSystem fs, Path path, boolean recurse, PathFilter pathFilter,
1127                        KeyProvider<K> keyProvider)
1128                        throws IOException
1129        {
1130                final LinkedHashMap<Path, K> addedFiles = new LinkedHashMap<Path, K>();
1131                appendFiles(fs, path, recurse, pathFilter, keyProvider, addedFiles);
1132                return addedFiles;
1133        }
1134
1135        private void appendFiles(final FileSystem fs, Path path, boolean recurse, PathFilter pathFilter,
1136                        KeyProvider<K> keyProvider,
1137                        Map<Path, K> addedFiles) throws IOException
1138        {
1139                if (fs.isFile(path)) {
1140                        if (pathFilter == null || pathFilter.accept(path)) {
1141                                final K key = keyProvider.getKey(fs, path);
1142                                appendFile(key, fs, path);
1143                                addedFiles.put(path, key);
1144                        }
1145                } else if (recurse) {
1146                        // fs.listStatus(path);
1147                        final FileStatus[] status = fs.listStatus(path, new PathFilter() {
1148
1149                                @Override
1150                                public boolean accept(Path potential) {
1151                                        try {
1152                                                fs.getStatus(potential);
1153                                                return true;
1154                                        } catch (final IOException e) {
1155                                                return false;
1156                                        }
1157                                }
1158
1159                        });
1160                        for (final FileStatus stat : status) {
1161                                appendFiles(fs, stat.getPath(), path.getParent(), pathFilter, keyProvider, addedFiles);
1162                        }
1163                }
1164        }
1165
1166        private void appendFiles(FileSystem fs, Path path, Path base, PathFilter pathFilter, KeyProvider<K> keyProvider,
1167                        Map<Path, K> addedFiles)
1168                        throws IOException
1169        {
1170                if (fs.isFile(path)) {
1171                        if (pathFilter == null || pathFilter.accept(path)) {
1172                                final K key = keyProvider.getKey(fs, path, base);
1173                                appendFile(key, fs, path);
1174                                addedFiles.put(path, key);
1175                        }
1176                } else {
1177                        try {
1178                                final FileStatus[] status = fs.listStatus(path);
1179
1180                                for (final FileStatus stat : status) {
1181                                        appendFiles(fs, stat.getPath(), base, pathFilter, keyProvider, addedFiles);
1182                                }
1183                        } catch (final Throwable e) {
1184                                System.err.println("Failed listing status on path: " + path);
1185                        }
1186                }
1187        }
1188
1189        public void writePathMap(Map<Path, K> map) throws IOException {
1190                final Path p = new Path(sequenceFilePath.getParent(), sequenceFilePath.getName().substring(0,
1191                                sequenceFilePath.getName().lastIndexOf("."))
1192                                + "-map.txt");
1193                FSDataOutputStream dos = null;
1194                PrintWriter pw = null;
1195
1196                try {
1197                        dos = fileSystem.create(p);
1198                        pw = new PrintWriter(dos);
1199
1200                        for (final Entry<Path, K> e : map.entrySet()) {
1201                                pw.println(e.getValue() + " " + e.getKey());
1202                        }
1203                } finally {
1204                        if (pw != null)
1205                                pw.close();
1206                        if (dos != null)
1207                                try {
1208                                        dos.close();
1209                                } catch (final IOException e) {
1210                                }
1211                }
1212        }
1213
1214        /**
1215         * Search for the record identified by queryKey.
1216         * 
1217         * @param queryKey
1218         *            the key.
1219         * @param offset
1220         *            the offset from which to commence search
1221         * @return the found value, or null.
1222         */
1223        @SuppressWarnings("unchecked")
1224        public V find(K queryKey, long offset) {
1225                if (!isReader) {
1226                        throw new UnsupportedOperationException("Cannot find key in write mode");
1227                }
1228
1229                Reader reader = null;
1230                try {
1231                        reader = createReader();
1232                        if (offset > 0)
1233                                reader.seek(offset);
1234
1235                        final K key = ReflectionUtils.newInstance((Class<K>) reader.getKeyClass(), config);
1236
1237                        while (reader.next(key)) {
1238                                System.out.println(key);
1239                                if (key.equals(queryKey)) {
1240                                        final V val = ReflectionUtils.newInstance((Class<V>) reader.getValueClass(), config);
1241
1242                                        reader.getCurrentValue(val);
1243
1244                                        return val;
1245                                }
1246                        }
1247                        return null;
1248                } catch (final Exception e) {
1249                        throw new RuntimeException(e);
1250                } finally {
1251                        if (reader != null)
1252                                try {
1253                                        reader.close();
1254                                } catch (final IOException e1) {
1255                                }
1256                }
1257        }
1258
1259        /**
1260         * Search for the record identified by queryKey. Uses a linear search from
1261         * the beginning of the file.
1262         * 
1263         * @param queryKey
1264         * @return the found value, or null.
1265         */
1266        public V find(K queryKey) {
1267                return find(queryKey, 0);
1268        }
1269
1270        /**
1271         * Find a record and write the value to a file.
1272         * 
1273         * @param key
1274         * @param uriOrPath
1275         * @param offset
1276         * @return false if record not found, true otherwise.
1277         * @throws IOException
1278         */
1279        public boolean findAndExport(K key, String uriOrPath, long offset) throws IOException {
1280                FileSystem fs = null;
1281                Path p = null;
1282
1283                if (uriOrPath != null) {
1284                        final URI uri = convertToURI(uriOrPath);
1285
1286                        fs = getFileSystem(uri);
1287                        p = new Path(uri.toString());
1288                }
1289
1290                return findAndExport(key, fs, p, offset);
1291        }
1292
1293        /**
1294         * Find a record and write the value to a file.
1295         * 
1296         * @param key
1297         * @param fs
1298         * @param dirPath
1299         * @param offset
1300         * @return false if record not found, true otherwise.
1301         * @throws IOException
1302         */
1303        public boolean findAndExport(K key, FileSystem fs, Path dirPath, long offset) throws IOException {
1304                final V value = find(key, offset);
1305
1306                if (value == null)
1307                        return false;
1308
1309                if (fs != null && fs != null) {
1310                        final Path outFilePath = new Path(dirPath, key.toString());
1311                        writeFile(fs, outFilePath, value);
1312                } else {
1313                        printFile(value);
1314                }
1315
1316                return true;
1317        }
1318
1319        public Path getSequenceFilePath() {
1320                return sequenceFilePath;
1321        }
1322
1323        class SequenceFileEntry implements Entry<K, V> {
1324                K key;
1325                V value;
1326
1327                public SequenceFileEntry(K k, V v) {
1328                        key = k;
1329                        value = v;
1330                }
1331
1332                @Override
1333                public K getKey() {
1334                        return key;
1335                }
1336
1337                @Override
1338                public V getValue() {
1339                        return value;
1340                }
1341
1342                @Override
1343                public V setValue(V value) {
1344                        this.value = value;
1345                        return value;
1346                }
1347        }
1348
1349        class SequenceFileIterator implements Iterator<Entry<K, V>> {
1350                Reader reader = null;
1351                Entry<K, V> next;
1352                boolean shouldMove = true;
1353
1354                @SuppressWarnings("unchecked")
1355                public SequenceFileIterator() {
1356                        try {
1357                                reader = createReader();
1358
1359                                next = new SequenceFileEntry(ReflectionUtils.newInstance((Class<K>) reader.getKeyClass(), config),
1360                                                ReflectionUtils.newInstance(
1361                                                                (Class<V>) reader.getValueClass(), config));
1362                        } catch (final IOException e) {
1363                                throw new RuntimeException(e);
1364                        }
1365                }
1366
1367                @Override
1368                public boolean hasNext() {
1369                        tryGetNext();
1370                        return next != null;
1371                }
1372
1373                private void tryGetNext() {
1374                        if (next != null && shouldMove) {
1375                                shouldMove = false;
1376                                try {
1377                                        if (!reader.next(next.getKey(), next.getValue())) {
1378                                                next = null;
1379                                                try {
1380                                                        reader.close();
1381                                                } catch (final IOException e1) {
1382                                                }
1383                                        }
1384                                } catch (final IOException e) {
1385                                        try {
1386                                                reader.close();
1387                                        } catch (final IOException e1) {
1388                                        }
1389                                        throw new RuntimeException(e);
1390                                }
1391                        }
1392                }
1393
1394                @Override
1395                public Entry<K, V> next() {
1396                        tryGetNext();
1397
1398                        if (next == null) {
1399                                throw new NoSuchElementException();
1400                        }
1401                        shouldMove = true;
1402                        return next;
1403                }
1404
1405                @Override
1406                public void remove() {
1407                }
1408        }
1409
1410        @Override
1411        public Iterator<Entry<K, V>> iterator() {
1412                if (!isReader) {
1413                        throw new UnsupportedOperationException("Cannot iterate in write mode");
1414                }
1415
1416                return new SequenceFileIterator();
1417        }
1418}