001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.sequencefile; 031 032import java.io.File; 033import java.io.IOException; 034import java.io.InputStream; 035import java.io.PrintStream; 036import java.io.PrintWriter; 037import java.lang.reflect.ParameterizedType; 038import java.math.BigInteger; 039import java.net.URI; 040import java.security.MessageDigest; 041import java.security.NoSuchAlgorithmException; 042import java.util.ArrayList; 043import java.util.HashMap; 044import java.util.Iterator; 045import java.util.LinkedHashMap; 046import java.util.List; 047import java.util.Map; 048import java.util.Map.Entry; 049import java.util.NoSuchElementException; 050import java.util.UUID; 051import java.util.zip.Deflater; 052import java.util.zip.ZipEntry; 053import java.util.zip.ZipOutputStream; 054 055import org.apache.hadoop.conf.Configuration; 056import org.apache.hadoop.fs.FSDataOutputStream; 057import org.apache.hadoop.fs.FileStatus; 058import org.apache.hadoop.fs.FileSystem; 059import org.apache.hadoop.fs.LocalFileSystem; 060import org.apache.hadoop.fs.Path; 061import org.apache.hadoop.fs.PathFilter; 062import org.apache.hadoop.io.SequenceFile; 063import org.apache.hadoop.io.SequenceFile.CompressionType; 064import org.apache.hadoop.io.SequenceFile.Metadata; 065import org.apache.hadoop.io.SequenceFile.Reader; 066import org.apache.hadoop.io.SequenceFile.Writer; 067import org.apache.hadoop.io.Text; 068import org.apache.hadoop.io.Writable; 069import org.apache.hadoop.io.compress.CompressionCodec; 070import org.apache.hadoop.io.compress.DefaultCodec; 071import org.apache.hadoop.util.ReflectionUtils; 072 073/** 074 * Base class for a utility class that deals with specifically typed sequence 075 * files. 076 * 077 * @author Jonathon Hare (jsh2@ecs.soton.ac.uk) 078 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 079 * 080 * @param <K> 081 * Key type 082 * @param <V> 083 * Value type 084 */ 085public abstract class SequenceFileUtility<K extends Writable, V extends Writable> implements Iterable<Entry<K, V>> { 086 protected Configuration config = new Configuration(); 087 protected FileSystem fileSystem; 088 protected Path sequenceFilePath; 089 090 protected Writer writer; 091 protected CompressionType compressionType = CompressionType.BLOCK; 092 093 protected boolean isReader; 094 095 protected String uuid; 096 097 public SequenceFileUtility(String uriOrPath, boolean read) throws IOException { 098 setup(convertToURI(uriOrPath), read); 099 } 100 101 public SequenceFileUtility(URI uri, boolean read) throws IOException { 102 setup(uri, read); 103 } 104 105 public SequenceFileUtility(String uriOrPath, CompressionType compressionType) throws IOException { 106 this.compressionType = compressionType; 107 setup(convertToURI(uriOrPath), false); 108 } 109 110 public SequenceFileUtility(URI uri, CompressionType compressionType) throws IOException { 111 this.compressionType = compressionType; 112 setup(uri, false); 113 } 114 115 /** 116 * Get a list of all the reducer outputs in a directory. If the given 117 * path/uri is not a directory, then it is assumed that it is a SequenceFile 118 * and returned directly. 119 * 120 * @param uriOrPath 121 * the path or uri 122 * @return the reducer outputs 123 * @throws IOException 124 */ 125 public static URI[] getReducerFiles(String uriOrPath) throws IOException { 126 return getFiles(uriOrPath, "part-r-"); 127 } 128 129 /** 130 * Get a list of all the sequence files (with a given name prefix) in a 131 * directory. If the given uri is not a directory, then it is assumed that 132 * it is a SequenceFile and returned directly. 133 * 134 * @param uriOrPath 135 * the path or uri 136 * @param filenamePrefix 137 * the prefix of the file name 138 * @return the matching files 139 * @throws IOException 140 */ 141 public static URI[] getFiles(String uriOrPath, final String filenamePrefix) throws IOException { 142 final Configuration config = new Configuration(); 143 final URI uri = convertToURI(uriOrPath); 144 final FileSystem fs = FileSystem.get(uri, config); 145 final Path path = new Path(uri.toString()); 146 147 if (fs.getFileStatus(path).isDir()) { 148 final FileStatus[] files = fs.listStatus(path, new PathFilter() { 149 @Override 150 public boolean accept(Path p) { 151 return p.getName().startsWith(filenamePrefix); 152 } 153 }); 154 155 final URI[] uris = new URI[files.length]; 156 int i = 0; 157 for (final FileStatus status : files) { 158 uris[i++] = status.getPath().toUri(); 159 } 160 return uris; 161 } else { 162 return new URI[] { uri }; 163 } 164 } 165 166 /** 167 * Get a list of all the sequence files (with a given name prefix) in the 168 * set of input paths. If a given uri is not a directory, then it is assumed 169 * that it is a SequenceFile and returned directly. 170 * 171 * @param uriOrPaths 172 * the paths or uris 173 * @param filenamePrefix 174 * the prefix of the file name 175 * @return the list of sequence files 176 * @throws IOException 177 */ 178 public static Path[] getFilePaths(String[] uriOrPaths, String filenamePrefix) throws IOException { 179 final List<Path> pathList = new ArrayList<Path>(); 180 for (final String uriOrPath : uriOrPaths) { 181 final Path[] paths = getFilePaths(uriOrPath, filenamePrefix); 182 for (final Path path : paths) { 183 pathList.add(path); 184 } 185 } 186 return pathList.toArray(new Path[pathList.size()]); 187 } 188 189 /** 190 * Get a list of all the sequence files (with a given name prefix) in the 191 * set of input paths. 192 * <p> 193 * Optionally a subdirectory can be provided; if provided the subdirectory 194 * is appended to each path (i.e. PATH/subdirectory). 195 * <p> 196 * If the given uri is not a directory, then it is assumed that it is a 197 * single SequenceFile and returned directly. 198 * 199 * @param uriOrPaths 200 * the URI or path to the directory/file 201 * @param subdir 202 * the optional subdirectory (may be null) 203 * @param filenamePrefix 204 * the prefix of the file name 205 * @return the list of sequence files 206 * @throws IOException 207 */ 208 public static Path[] getFilePaths(String[] uriOrPaths, String subdir, String filenamePrefix) throws IOException { 209 final List<Path> pathList = new ArrayList<Path>(); 210 211 for (String uriOrPath : uriOrPaths) { 212 if (subdir != null) 213 uriOrPath += "/" + subdir; 214 215 final Path[] paths = getFilePaths(uriOrPath, filenamePrefix); 216 for (final Path path : paths) { 217 pathList.add(path); 218 } 219 } 220 return pathList.toArray(new Path[pathList.size()]); 221 } 222 223 /** 224 * Get a list of all the sequence files (with a given name prefix) in a 225 * directory. If the given uri is not a directory, then it is assumed that 226 * it is a SequenceFile and returned directly. 227 * 228 * @param uriOrPath 229 * the path or uri 230 * @param filenamePrefix 231 * the prefix of the file name 232 * @return the list of sequence files 233 * @throws IOException 234 */ 235 public static Path[] getFilePaths(String uriOrPath, final String filenamePrefix) throws IOException { 236 final Configuration config = new Configuration(); 237 final URI uri = convertToURI(uriOrPath); 238 final FileSystem fs = FileSystem.get(uri, config); 239 240 final Path path = new Path(uri); 241 242 if (fs.getFileStatus(path).isDir()) { 243 final FileStatus[] files = fs.listStatus(path, new PathFilter() { 244 @Override 245 public boolean accept(Path p) { 246 return p.getName().startsWith(filenamePrefix); 247 } 248 }); 249 250 final Path[] uris = new Path[files.length]; 251 int i = 0; 252 for (final FileStatus status : files) { 253 uris[i++] = status.getPath(); 254 } 255 return uris; 256 } else { 257 return new Path[] { path }; 258 } 259 } 260 261 /** 262 * Get a list of all the sequence files whose names match the given regular 263 * expression in a directory. If the given uri is not a directory, then it 264 * is assumed that it is a SequenceFile and returned directly. 265 * 266 * @param uriOrPath 267 * the path or uri 268 * @param regex 269 * the regular expression to match 270 * @return a list of files 271 * @throws IOException 272 */ 273 public static URI[] getFilesRegex(String uriOrPath, final String regex) throws IOException { 274 final Configuration config = new Configuration(); 275 final URI uri = convertToURI(uriOrPath); 276 final FileSystem fs = FileSystem.get(uri, config); 277 final Path path = new Path(uri.toString()); 278 279 if (fs.getFileStatus(path).isDir()) { 280 final FileStatus[] files = fs.listStatus(path, new PathFilter() { 281 @Override 282 public boolean accept(Path p) { 283 return (regex == null || p.getName().matches(regex)); 284 } 285 }); 286 287 final URI[] uris = new URI[files.length]; 288 int i = 0; 289 for (final FileStatus status : files) { 290 uris[i++] = status.getPath().toUri(); 291 } 292 return uris; 293 } else { 294 return new URI[] { uri }; 295 } 296 } 297 298 /** 299 * Return a list of the keys in the sequence file. Read mode only. 300 * 301 * @return keys. 302 */ 303 @SuppressWarnings("unchecked") 304 public Map<K, Long> listKeysAndOffsets() { 305 if (!isReader) { 306 throw new UnsupportedOperationException("Cannot read keys in write mode"); 307 } 308 309 Reader reader = null; 310 try { 311 final Map<K, Long> keys = new LinkedHashMap<K, Long>(); 312 313 reader = createReader(); 314 final Class<K> keyClass = (Class<K>) reader.getKeyClass(); 315 K key = ReflectionUtils.newInstance(keyClass, config); 316 final V val = ReflectionUtils.newInstance((Class<V>) reader.getValueClass(), config); 317 long start = 0L; 318 long end = 0L; 319 while (reader.next(key, val)) { 320 final long pos = reader.getPosition(); 321 if (pos != end) { 322 start = end; 323 end = pos; 324 } 325 keys.put(key, start); 326 key = ReflectionUtils.newInstance(keyClass, config); 327 } 328 329 return keys; 330 } catch (final Exception e) { 331 throw new RuntimeException(e); 332 } finally { 333 if (reader != null) 334 try { 335 reader.close(); 336 } catch (final IOException e1) { 337 } 338 } 339 } 340 341 /** 342 * Go through a sequence file, applying each 343 * {@link RecordInformationExtractor} to each key, printing out the results 344 * in order to the provided {@link PrintStream} 345 * 346 * @param extractors 347 * the {@link RecordInformationExtractor}s to apply 348 * @param stream 349 * the stream to write to 350 * @param delim 351 */ 352 @SuppressWarnings("unchecked") 353 public void extract(List<RecordInformationExtractor> extractors, PrintStream stream, String delim) { 354 if (!isReader) { 355 throw new UnsupportedOperationException("Cannot read keys in write mode"); 356 } 357 358 Reader reader = null; 359 try { 360 reader = createReader(); 361 362 final Class<K> keyClass = (Class<K>) reader.getKeyClass(); 363 K key = ReflectionUtils.newInstance(keyClass, config); 364 final V val = ReflectionUtils.newInstance((Class<V>) reader.getValueClass(), config); 365 long start = 0L; 366 long end = 0L; 367 int count = 0; 368 while (reader.next(key, val)) { 369 final long pos = reader.getPosition(); 370 if (pos != end) { 371 start = end; 372 end = pos; 373 } 374 375 // Apply the filters and print 376 String recordString = ""; 377 for (final RecordInformationExtractor extractor : extractors) { 378 recordString += extractor.extract(key, val, start, sequenceFilePath) + delim; 379 } 380 381 if (recordString.length() >= delim.length()) 382 recordString = recordString.substring(0, recordString.length() - delim.length()); 383 384 stream.println(recordString); 385 count++; 386 387 System.err.printf("\rOutputted: %10d", count); 388 key = ReflectionUtils.newInstance(keyClass, config); 389 } 390 System.err.println(); 391 } catch (final Exception e) { 392 throw new RuntimeException(e); 393 } finally { 394 if (reader != null) 395 try { 396 reader.close(); 397 } catch (final IOException e1) { 398 } 399 } 400 } 401 402 public SequenceFileUtility(String uriOrPath, CompressionType compressionType, Map<String, String> metadata) 403 throws IOException 404 { 405 this.compressionType = compressionType; 406 setup(convertToURI(uriOrPath), false); 407 } 408 409 public SequenceFileUtility(URI uri, CompressionType compressionType, Map<String, String> metadata) throws IOException 410 { 411 this.compressionType = compressionType; 412 setup(uri, false); 413 } 414 415 /** 416 * Converts a string representing a file or uri to a uri object. 417 * 418 * @param uriOrPath 419 * uri or path to convert 420 * @return uri 421 */ 422 public static URI convertToURI(String uriOrPath) { 423 if (uriOrPath.contains("://")) { 424 return URI.create(uriOrPath); 425 } else { 426 return new File(uriOrPath).toURI(); 427 } 428 } 429 430 private void setup(URI uri, boolean read) throws IOException { 431 setup(uri, read, null); 432 } 433 434 private void setup(URI uri, boolean read, Map<String, String> metadata) throws IOException { 435 fileSystem = getFileSystem(uri); 436 sequenceFilePath = new Path(uri.toString()); 437 438 this.isReader = read; 439 440 if (read) { 441 Reader reader = null; 442 443 try { 444 reader = createReader(); 445 final Text uuidText = reader.getMetadata().get(new Text(MetadataConfiguration.UUID_KEY)); 446 if (uuidText != null) 447 uuid = uuidText.toString(); 448 449 if (!reader.isCompressed()) 450 compressionType = CompressionType.NONE; 451 else if (reader.isBlockCompressed()) 452 compressionType = CompressionType.BLOCK; 453 else 454 compressionType = CompressionType.RECORD; 455 } catch (final Exception e) { 456 throw new RuntimeException(e); 457 } finally { 458 if (reader != null) 459 try { 460 reader.close(); 461 } catch (final IOException e1) { 462 } 463 } 464 } else { 465 if (metadata == null) { 466 metadata = new HashMap<String, String>(); 467 } 468 469 if (!metadata.containsKey(MetadataConfiguration.UUID_KEY)) { 470 uuid = UUID.randomUUID().toString(); 471 metadata.put(MetadataConfiguration.UUID_KEY, uuid); 472 } 473 474 // if the output directory is a directory, then create the file 475 // inside the 476 // directory with the name given by the uuid 477 if (fileSystem.exists(sequenceFilePath) && fileSystem.getFileStatus(sequenceFilePath).isDir()) { 478 sequenceFilePath = new Path(sequenceFilePath, uuid + ".seq"); 479 } 480 481 writer = createWriter(metadata); 482 } 483 } 484 485 @SuppressWarnings("unchecked") 486 private Writer createWriter(Map<String, String> metadata) throws IOException { 487 final Metadata md = new Metadata(); 488 489 for (final Entry<String, String> e : metadata.entrySet()) { 490 md.set(new Text(e.getKey()), new Text(e.getValue())); 491 } 492 final Class<K> keyClass = (Class<K>) ((ParameterizedType) getClass().getGenericSuperclass()) 493 .getActualTypeArguments()[0]; 494 final Class<V> valueClass = (Class<V>) ((ParameterizedType) getClass().getGenericSuperclass()) 495 .getActualTypeArguments()[1]; 496 497 return SequenceFile.createWriter(fileSystem, config, sequenceFilePath, keyClass, valueClass, compressionType, 498 new DefaultCodec(), null, 499 md); 500 } 501 502 private Reader createReader() throws IOException { 503 // if(this.fileSystem.getFileStatus(sequenceFilePath).isDir()) 504 // sequenceFilePath = new Path(sequenceFilePath,"part-r-00000"); 505 return new Reader(fileSystem, sequenceFilePath, config); 506 } 507 508 /** 509 * Get the UUID of this file 510 * 511 * @return UUID 512 */ 513 public String getUUID() { 514 return uuid; 515 } 516 517 /** 518 * Return the metadata map. Read mode only. 519 * 520 * @return metadata 521 */ 522 public Map<Text, Text> getMetadata() { 523 if (!isReader) { 524 throw new UnsupportedOperationException("Cannot read metadata in write mode"); 525 } 526 527 Reader reader = null; 528 try { 529 reader = createReader(); 530 final Map<Text, Text> metadata = reader.getMetadata().getMetadata(); 531 return metadata; 532 } catch (final Exception e) { 533 throw new RuntimeException(e); 534 } finally { 535 if (reader != null) 536 try { 537 reader.close(); 538 } catch (final IOException e1) { 539 } 540 } 541 } 542 543 /** 544 * Return a list of the keys in the sequence file. Read mode only. 545 * 546 * @return keys. 547 */ 548 @SuppressWarnings("unchecked") 549 public List<K> listKeys() { 550 if (!isReader) { 551 throw new UnsupportedOperationException("Cannot read keys in write mode"); 552 } 553 554 Reader reader = null; 555 try { 556 final List<K> keys = new ArrayList<K>(); 557 558 reader = createReader(); 559 final Class<K> keyClass = (Class<K>) reader.getKeyClass(); 560 K key = ReflectionUtils.newInstance(keyClass, config); 561 562 while (reader.next(key)) { 563 keys.add(key); 564 key = ReflectionUtils.newInstance(keyClass, config); 565 } 566 567 return keys; 568 } catch (final Exception e) { 569 throw new RuntimeException(e); 570 } finally { 571 if (reader != null) 572 try { 573 reader.close(); 574 } catch (final IOException e1) { 575 } 576 } 577 } 578 579 /** 580 * Extracts file to a directory. Read mode only. 581 * 582 * @param uriOrPath 583 * path or uri to extract to. 584 * @throws IOException 585 */ 586 public void exportData(String uriOrPath) throws IOException { 587 exportData(uriOrPath, NamingStrategy.KEY, new ExtractionState(), false, 0); 588 } 589 590 /** 591 * Extracts file to a directory. Read mode only. 592 * 593 * @param uriOrPath 594 * path or uri to extract to. 595 * @param naming 596 * the naming strategy 597 * @param extrState 598 * the extraction state 599 * @param addExtension 600 * if true, then file extensions are added to each record 601 * automatically 602 * @param offset 603 * offset from which to start. Can be used to reduce number of 604 * files extracted. 605 * @throws IOException 606 */ 607 public void exportData(String uriOrPath, NamingStrategy naming, ExtractionState extrState, boolean addExtension, 608 long offset) 609 throws IOException 610 { 611 FileSystem fs = null; 612 Path p = null; 613 614 if (uriOrPath != null) { 615 final URI uri = convertToURI(uriOrPath); 616 617 fs = getFileSystem(uri); 618 p = new Path(uri.toString()); 619 } 620 621 exportData(fs, p, naming, extrState, addExtension, offset); 622 } 623 624 public static ZipOutputStream openZipOutputStream(String uriOrPath) throws IOException { 625 final URI uri = convertToURI(uriOrPath); 626 627 final FileSystem fs = getFileSystem(uri, new Configuration()); 628 final Path path = new Path(uri.toString()); 629 630 final ZipOutputStream zos = new ZipOutputStream(fs.create(path)); 631 zos.setLevel(Deflater.BEST_COMPRESSION); 632 return zos; 633 } 634 635 /** 636 * Extracts file to a directory. Read mode only. 637 * 638 * @param uriOrPath 639 * path or uri to extract to. 640 * @param naming 641 * the naming strategy 642 * @param state 643 * the extraction state 644 * @param addExtension 645 * if true, then file extensions are added to each record 646 * automatically 647 * @param offset 648 * offset from which to start. Can be used to reduce number of 649 * files extracted. 650 * @throws IOException 651 */ 652 public void exportDataToZip(String uriOrPath, NamingStrategy naming, ExtractionState state, boolean addExtension, 653 long offset) 654 throws IOException 655 { 656 if (uriOrPath != null) { 657 658 ZipOutputStream zos = null; 659 try { 660 zos = openZipOutputStream(uriOrPath); 661 exportDataToZip(zos, naming, state, addExtension, offset); 662 } finally { 663 if (zos != null) 664 try { 665 zos.close(); 666 } catch (final IOException e) { 667 } 668 ; 669 } 670 } 671 } 672 673 /** 674 * Extracts file to a zip file. Read mode only. 675 * 676 * @param zos 677 * The {@link ZipOutputStream} to write to 678 * @param naming 679 * The naming strategy 680 * @param extrState 681 * The extration state 682 * @param addExtension 683 * if true, then file extensions are added to each record 684 * automatically 685 * @param offset 686 * offset from which to start. Can be used to reduce number of 687 * files extracted. 688 * @throws IOException 689 */ 690 public void exportDataToZip(ZipOutputStream zos, NamingStrategy naming, ExtractionState extrState, 691 boolean addExtension, long offset) 692 throws IOException 693 { 694 if (!isReader) { 695 throw new UnsupportedOperationException("Cannot read keys in write mode"); 696 } 697 698 Reader reader = null; 699 try { 700 reader = createReader(); 701 if (offset > 0) 702 reader.seek(offset); 703 704 @SuppressWarnings("unchecked") 705 final K key = ReflectionUtils.newInstance((Class<K>) reader.getKeyClass(), config); 706 @SuppressWarnings("unchecked") 707 final V val = ReflectionUtils.newInstance((Class<V>) reader.getValueClass(), config); 708 709 while (reader.next(key)) { 710 711 if (extrState.allowNext()) { 712 reader.getCurrentValue(val); 713 714 String name = naming.getName(key, val, extrState, addExtension); 715 716 while (name.startsWith("/")) 717 name = name.substring(1); 718 719 final ZipEntry ze = new ZipEntry(name); 720 zos.putNextEntry(ze); 721 writeZipData(zos, val); 722 zos.closeEntry(); 723 724 extrState.tick(); 725 } else { 726 extrState.tick(); 727 } 728 if (extrState.isFinished()) 729 break; 730 } 731 } catch (final Exception e) { 732 throw new RuntimeException(e); 733 } finally { 734 if (reader != null) 735 try { 736 reader.close(); 737 } catch (final IOException e1) { 738 } 739 } 740 } 741 742 /** 743 * Extracts file to a directory. Read mode only. 744 * 745 * @param fs 746 * filesystem of output file 747 * @param dirPath 748 * path to extract to 749 */ 750 public void exportData(FileSystem fs, Path dirPath) { 751 exportData(fs, dirPath, NamingStrategy.KEY, new ExtractionState(), false, 0); 752 } 753 754 /** 755 * Extracts file to a directory. Read mode only. 756 * 757 * @param fs 758 * filesystem of output file 759 * @param dirPath 760 * path to extract to 761 * @param naming 762 * the naming strategy 763 * @param extrState 764 * the extraction state 765 * @param addExtension 766 * if true, then file extensions are added to each record 767 * automatically 768 * @param offset 769 * offset from which to start. Can be used to reduce number of 770 * files extracted. 771 */ 772 @SuppressWarnings("unchecked") 773 public void exportData(FileSystem fs, Path dirPath, NamingStrategy naming, ExtractionState extrState, 774 boolean addExtension, long offset) 775 { 776 if (!isReader) { 777 throw new UnsupportedOperationException("Cannot read keys in write mode"); 778 } 779 780 Reader reader = null; 781 try { 782 if (fs != null) 783 fs.mkdirs(dirPath); 784 785 reader = createReader(); 786 if (offset > 0) 787 reader.seek(offset); 788 789 final K key = ReflectionUtils.newInstance((Class<K>) reader.getKeyClass(), config); 790 final V val = ReflectionUtils.newInstance((Class<V>) reader.getValueClass(), config); 791 792 while (reader.next(key)) { 793 794 if (extrState.allowNext()) { 795 reader.getCurrentValue(val); 796 if (dirPath != null) { 797 String name = naming.getName(key, val, extrState, addExtension); 798 if (name.startsWith("/")) 799 name = "." + name; 800 801 final Path outFilePath = new Path(dirPath, name); 802 // System.out.println("NP: " + np); 803 System.out.println("Path: " + outFilePath); 804 writeFile(fs, outFilePath, val); 805 extrState.tick(); 806 } else { 807 System.out.println(key.toString()); 808 printFile(val); 809 extrState.tick(); 810 } 811 } else { 812 extrState.tick(); 813 } 814 if (extrState.isFinished()) 815 break; 816 } 817 } catch (final Exception e) { 818 throw new RuntimeException(e); 819 } finally { 820 if (reader != null) 821 try { 822 reader.close(); 823 } catch (final IOException e1) { 824 } 825 } 826 } 827 828 @SuppressWarnings("unchecked") 829 public void exportData(NamingStrategy np, ExtractionState nps, long offset, KeyValueDump<K, V> dump) { 830 if (!isReader) { 831 throw new UnsupportedOperationException("Cannot read keys in write mode"); 832 } 833 834 Reader reader = null; 835 try { 836 837 reader = createReader(); 838 if (offset > 0) 839 reader.seek(offset); 840 841 final K key = ReflectionUtils.newInstance((Class<K>) reader.getKeyClass(), config); 842 final V val = ReflectionUtils.newInstance((Class<V>) reader.getValueClass(), config); 843 844 while (reader.next(key)) { 845 846 if (nps.allowNext()) { 847 reader.getCurrentValue(val); 848 dump.dumpValue(key, val); 849 } 850 nps.tick(); 851 if (nps.isFinished()) 852 break; 853 } 854 } catch (final Exception e) { 855 throw new RuntimeException(e); 856 } finally { 857 if (reader != null) 858 try { 859 reader.close(); 860 } catch (final IOException e1) { 861 } 862 } 863 } 864 865 /** 866 * Close the underlying writer. Does nothing in read mode. 867 * 868 * @throws IOException 869 */ 870 public void close() throws IOException { 871 if (writer != null) 872 writer.close(); 873 } 874 875 /** 876 * Get number of records in file. Read mode only. 877 * 878 * @return number of records 879 */ 880 public long getNumberRecords() { 881 if (!isReader) { 882 throw new UnsupportedOperationException("Cannot read keys in write mode"); 883 } 884 885 Reader reader = null; 886 try { 887 reader = createReader(); 888 889 final Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), config); 890 891 long count = 0; 892 while (reader.next(key)) { 893 count++; 894 } 895 return count; 896 } catch (final Exception e) { 897 throw new RuntimeException(e); 898 } finally { 899 if (reader != null) 900 try { 901 reader.close(); 902 } catch (final IOException e1) { 903 } 904 } 905 } 906 907 /** 908 * @return the compression codec in use for this file. 909 */ 910 public Class<? extends CompressionCodec> getCompressionCodecClass() { 911 if (!isReader) 912 return DefaultCodec.class; 913 914 Reader reader = null; 915 try { 916 reader = createReader(); 917 if (reader.getCompressionCodec() == null) 918 return null; 919 return reader.getCompressionCodec().getClass(); 920 } catch (final Exception e) { 921 throw new RuntimeException(e); 922 } finally { 923 if (reader != null) 924 try { 925 reader.close(); 926 } catch (final IOException e1) { 927 } 928 } 929 } 930 931 /** 932 * @return he compression mode used for this sequence file. 933 */ 934 public CompressionType getCompressionType() { 935 return compressionType; 936 } 937 938 /** 939 * Get the filesystem associated with a uri. 940 * 941 * @param uri 942 * @return the filesystem 943 * @throws IOException 944 */ 945 public FileSystem getFileSystem(URI uri) throws IOException { 946 return getFileSystem(uri, config); 947 } 948 949 /** 950 * Get the filesystem associated with a uri. 951 * 952 * @param uri 953 * @param config 954 * @return the filesystem 955 * @throws IOException 956 */ 957 public static FileSystem getFileSystem(URI uri, Configuration config) throws IOException { 958 FileSystem fs = FileSystem.get(uri, config); 959 if (fs instanceof LocalFileSystem) 960 fs = ((LocalFileSystem) fs).getRaw(); 961 return fs; 962 } 963 964 /** 965 * Get a path from a uri. 966 * 967 * @param uri 968 * @return the path 969 * @throws IOException 970 */ 971 public Path getPath(URI uri) throws IOException { 972 return new Path(uri.toString()); 973 } 974 975 /** 976 * Get the MD5 sum of a file 977 * 978 * @param fs 979 * @param p 980 * @return the md5sum 981 */ 982 public static String md5sum(FileSystem fs, Path p) { 983 MessageDigest digest; 984 try { 985 digest = MessageDigest.getInstance("MD5"); 986 } catch (final NoSuchAlgorithmException e1) { 987 throw new RuntimeException(e1); 988 } 989 990 InputStream is = null; 991 try { 992 final byte[] buffer = new byte[8192]; 993 int read = 0; 994 995 is = fs.open(p); 996 while ((read = is.read(buffer)) > 0) { 997 digest.update(buffer, 0, read); 998 } 999 final byte[] md5sum = digest.digest(); 1000 1001 final BigInteger bigInt = new BigInteger(1, md5sum); 1002 return bigInt.toString(16); 1003 } catch (final IOException e) { 1004 throw new RuntimeException("Unable to process file for MD5", e); 1005 } finally { 1006 try { 1007 if (is != null) 1008 is.close(); 1009 } catch (final IOException e) { 1010 } 1011 } 1012 } 1013 1014 protected abstract V readFile(FileSystem fs, Path path) throws IOException; 1015 1016 protected abstract void writeFile(FileSystem fs, Path path, V value) throws IOException; 1017 1018 protected abstract void writeZipData(ZipOutputStream zos, V value) throws IOException; 1019 1020 protected abstract void printFile(V value) throws IOException; 1021 1022 /** 1023 * Append data read from a file to the sequence file. 1024 * 1025 * @param key 1026 * @param fs 1027 * @param p 1028 * @throws IOException 1029 */ 1030 public void appendFile(K key, FileSystem fs, Path p) throws IOException { 1031 if (isReader) { 1032 throw new UnsupportedOperationException("Cannot write data in read mode"); 1033 } 1034 1035 writer.append(key, readFile(fs, p)); 1036 } 1037 1038 /** 1039 * Append data to a sequence file. 1040 * 1041 * @param key 1042 * @param value 1043 * @throws IOException 1044 */ 1045 public void appendData(K key, V value) throws IOException { 1046 if (isReader) { 1047 throw new UnsupportedOperationException("Cannot write data in read mode"); 1048 } 1049 writer.append(key, value); 1050 } 1051 1052 /** 1053 * Interface for objects that can make a key from a path 1054 * 1055 * @param <K> 1056 */ 1057 public interface KeyProvider<K> { 1058 K getKey(FileSystem fs, Path path); 1059 1060 K getKey(FileSystem fs, Path path, Path base); 1061 } 1062 1063 /** 1064 * A class that provides Text keys by calculating a UUID from the MD5 of a 1065 * file 1066 */ 1067 public static class MD5UUIDKeyProvider implements KeyProvider<Text> { 1068 @Override 1069 public Text getKey(FileSystem fs, Path path) { 1070 final UUID uuid = UUID.nameUUIDFromBytes(SequenceFileUtility.md5sum(fs, path).getBytes()); 1071 return new Text(uuid.toString()); 1072 } 1073 1074 @Override 1075 public Text getKey(FileSystem fs, Path path, Path base) { 1076 return this.getKey(fs, path); 1077 } 1078 } 1079 1080 /** 1081 * A class that provides Text keys from the name of a file 1082 */ 1083 public static class FilenameKeyProvider implements KeyProvider<Text> { 1084 @Override 1085 public Text getKey(FileSystem fs, Path path) { 1086 return new Text(path.getName()); 1087 } 1088 1089 @Override 1090 public Text getKey(FileSystem fs, Path path, Path base) { 1091 return this.getKey(fs, path); 1092 } 1093 } 1094 1095 /** 1096 * A class that provides Text keys from the relative path + name of a file 1097 */ 1098 public static class RelativePathFilenameKeyProvider implements KeyProvider<Text> { 1099 @Override 1100 public Text getKey(FileSystem fs, Path path) { 1101 return new Text(path.toUri().getPath()); 1102 } 1103 1104 @Override 1105 public Text getKey(FileSystem fs, Path path, Path base) { 1106 return new Text(path.toUri().getPath().substring(base.toUri().getPath().length())); 1107 } 1108 } 1109 1110 /** 1111 * Append files to a sequenceFile. 1112 * 1113 * @param fs 1114 * The filesystem of the files being added. 1115 * @param path 1116 * The path of the file(s) being added. 1117 * @param recurse 1118 * If true, then subdirectories are also searched 1119 * @param pathFilter 1120 * Filter for omitting files. Can be null. 1121 * @param keyProvider 1122 * Object that can return a key for a given file. 1123 * @return Paths and their respective keys for files that were added. 1124 * @throws IOException 1125 */ 1126 public Map<Path, K> appendFiles(FileSystem fs, Path path, boolean recurse, PathFilter pathFilter, 1127 KeyProvider<K> keyProvider) 1128 throws IOException 1129 { 1130 final LinkedHashMap<Path, K> addedFiles = new LinkedHashMap<Path, K>(); 1131 appendFiles(fs, path, recurse, pathFilter, keyProvider, addedFiles); 1132 return addedFiles; 1133 } 1134 1135 private void appendFiles(final FileSystem fs, Path path, boolean recurse, PathFilter pathFilter, 1136 KeyProvider<K> keyProvider, 1137 Map<Path, K> addedFiles) throws IOException 1138 { 1139 if (fs.isFile(path)) { 1140 if (pathFilter == null || pathFilter.accept(path)) { 1141 final K key = keyProvider.getKey(fs, path); 1142 appendFile(key, fs, path); 1143 addedFiles.put(path, key); 1144 } 1145 } else if (recurse) { 1146 // fs.listStatus(path); 1147 final FileStatus[] status = fs.listStatus(path, new PathFilter() { 1148 1149 @Override 1150 public boolean accept(Path potential) { 1151 try { 1152 fs.getStatus(potential); 1153 return true; 1154 } catch (final IOException e) { 1155 return false; 1156 } 1157 } 1158 1159 }); 1160 for (final FileStatus stat : status) { 1161 appendFiles(fs, stat.getPath(), path.getParent(), pathFilter, keyProvider, addedFiles); 1162 } 1163 } 1164 } 1165 1166 private void appendFiles(FileSystem fs, Path path, Path base, PathFilter pathFilter, KeyProvider<K> keyProvider, 1167 Map<Path, K> addedFiles) 1168 throws IOException 1169 { 1170 if (fs.isFile(path)) { 1171 if (pathFilter == null || pathFilter.accept(path)) { 1172 final K key = keyProvider.getKey(fs, path, base); 1173 appendFile(key, fs, path); 1174 addedFiles.put(path, key); 1175 } 1176 } else { 1177 try { 1178 final FileStatus[] status = fs.listStatus(path); 1179 1180 for (final FileStatus stat : status) { 1181 appendFiles(fs, stat.getPath(), base, pathFilter, keyProvider, addedFiles); 1182 } 1183 } catch (final Throwable e) { 1184 System.err.println("Failed listing status on path: " + path); 1185 } 1186 } 1187 } 1188 1189 public void writePathMap(Map<Path, K> map) throws IOException { 1190 final Path p = new Path(sequenceFilePath.getParent(), sequenceFilePath.getName().substring(0, 1191 sequenceFilePath.getName().lastIndexOf(".")) 1192 + "-map.txt"); 1193 FSDataOutputStream dos = null; 1194 PrintWriter pw = null; 1195 1196 try { 1197 dos = fileSystem.create(p); 1198 pw = new PrintWriter(dos); 1199 1200 for (final Entry<Path, K> e : map.entrySet()) { 1201 pw.println(e.getValue() + " " + e.getKey()); 1202 } 1203 } finally { 1204 if (pw != null) 1205 pw.close(); 1206 if (dos != null) 1207 try { 1208 dos.close(); 1209 } catch (final IOException e) { 1210 } 1211 } 1212 } 1213 1214 /** 1215 * Search for the record identified by queryKey. 1216 * 1217 * @param queryKey 1218 * the key. 1219 * @param offset 1220 * the offset from which to commence search 1221 * @return the found value, or null. 1222 */ 1223 @SuppressWarnings("unchecked") 1224 public V find(K queryKey, long offset) { 1225 if (!isReader) { 1226 throw new UnsupportedOperationException("Cannot find key in write mode"); 1227 } 1228 1229 Reader reader = null; 1230 try { 1231 reader = createReader(); 1232 if (offset > 0) 1233 reader.seek(offset); 1234 1235 final K key = ReflectionUtils.newInstance((Class<K>) reader.getKeyClass(), config); 1236 1237 while (reader.next(key)) { 1238 System.out.println(key); 1239 if (key.equals(queryKey)) { 1240 final V val = ReflectionUtils.newInstance((Class<V>) reader.getValueClass(), config); 1241 1242 reader.getCurrentValue(val); 1243 1244 return val; 1245 } 1246 } 1247 return null; 1248 } catch (final Exception e) { 1249 throw new RuntimeException(e); 1250 } finally { 1251 if (reader != null) 1252 try { 1253 reader.close(); 1254 } catch (final IOException e1) { 1255 } 1256 } 1257 } 1258 1259 /** 1260 * Search for the record identified by queryKey. Uses a linear search from 1261 * the beginning of the file. 1262 * 1263 * @param queryKey 1264 * @return the found value, or null. 1265 */ 1266 public V find(K queryKey) { 1267 return find(queryKey, 0); 1268 } 1269 1270 /** 1271 * Find a record and write the value to a file. 1272 * 1273 * @param key 1274 * @param uriOrPath 1275 * @param offset 1276 * @return false if record not found, true otherwise. 1277 * @throws IOException 1278 */ 1279 public boolean findAndExport(K key, String uriOrPath, long offset) throws IOException { 1280 FileSystem fs = null; 1281 Path p = null; 1282 1283 if (uriOrPath != null) { 1284 final URI uri = convertToURI(uriOrPath); 1285 1286 fs = getFileSystem(uri); 1287 p = new Path(uri.toString()); 1288 } 1289 1290 return findAndExport(key, fs, p, offset); 1291 } 1292 1293 /** 1294 * Find a record and write the value to a file. 1295 * 1296 * @param key 1297 * @param fs 1298 * @param dirPath 1299 * @param offset 1300 * @return false if record not found, true otherwise. 1301 * @throws IOException 1302 */ 1303 public boolean findAndExport(K key, FileSystem fs, Path dirPath, long offset) throws IOException { 1304 final V value = find(key, offset); 1305 1306 if (value == null) 1307 return false; 1308 1309 if (fs != null && fs != null) { 1310 final Path outFilePath = new Path(dirPath, key.toString()); 1311 writeFile(fs, outFilePath, value); 1312 } else { 1313 printFile(value); 1314 } 1315 1316 return true; 1317 } 1318 1319 public Path getSequenceFilePath() { 1320 return sequenceFilePath; 1321 } 1322 1323 class SequenceFileEntry implements Entry<K, V> { 1324 K key; 1325 V value; 1326 1327 public SequenceFileEntry(K k, V v) { 1328 key = k; 1329 value = v; 1330 } 1331 1332 @Override 1333 public K getKey() { 1334 return key; 1335 } 1336 1337 @Override 1338 public V getValue() { 1339 return value; 1340 } 1341 1342 @Override 1343 public V setValue(V value) { 1344 this.value = value; 1345 return value; 1346 } 1347 } 1348 1349 class SequenceFileIterator implements Iterator<Entry<K, V>> { 1350 Reader reader = null; 1351 Entry<K, V> next; 1352 boolean shouldMove = true; 1353 1354 @SuppressWarnings("unchecked") 1355 public SequenceFileIterator() { 1356 try { 1357 reader = createReader(); 1358 1359 next = new SequenceFileEntry(ReflectionUtils.newInstance((Class<K>) reader.getKeyClass(), config), 1360 ReflectionUtils.newInstance( 1361 (Class<V>) reader.getValueClass(), config)); 1362 } catch (final IOException e) { 1363 throw new RuntimeException(e); 1364 } 1365 } 1366 1367 @Override 1368 public boolean hasNext() { 1369 tryGetNext(); 1370 return next != null; 1371 } 1372 1373 private void tryGetNext() { 1374 if (next != null && shouldMove) { 1375 shouldMove = false; 1376 try { 1377 if (!reader.next(next.getKey(), next.getValue())) { 1378 next = null; 1379 try { 1380 reader.close(); 1381 } catch (final IOException e1) { 1382 } 1383 } 1384 } catch (final IOException e) { 1385 try { 1386 reader.close(); 1387 } catch (final IOException e1) { 1388 } 1389 throw new RuntimeException(e); 1390 } 1391 } 1392 } 1393 1394 @Override 1395 public Entry<K, V> next() { 1396 tryGetNext(); 1397 1398 if (next == null) { 1399 throw new NoSuchElementException(); 1400 } 1401 shouldMove = true; 1402 return next; 1403 } 1404 1405 @Override 1406 public void remove() { 1407 } 1408 } 1409 1410 @Override 1411 public Iterator<Entry<K, V>> iterator() { 1412 if (!isReader) { 1413 throw new UnsupportedOperationException("Cannot iterate in write mode"); 1414 } 1415 1416 return new SequenceFileIterator(); 1417 } 1418}