001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.util.stream; 031 032import java.util.Iterator; 033import java.util.concurrent.ThreadPoolExecutor; 034 035import org.openimaj.util.function.Function; 036import org.openimaj.util.function.MultiFunction; 037import org.openimaj.util.function.Operation; 038import org.openimaj.util.function.Predicate; 039import org.openimaj.util.parallel.Parallel; 040 041/** 042 * Interface describing a stream of data items. Streams are sequences of items 043 * supporting both sequential and parallel bulk operations. Streams support lazy 044 * transformative operations (transforming a stream to another stream) such as 045 * {@link #filter(Predicate)} and {@link #map(Function)}, and consuming 046 * operations, such as {@link #forEach(Operation)} and {@link #next()}. 047 * <p> 048 * Streams may be either bounded or infinite in length. Once an item has been 049 * extracted from a stream, it is said to be consumed and is no longer available 050 * for operations on the stream. 051 * 052 * @author Jonathon Hare (jsh2@ecs.soton.ac.uk) 053 * 054 * @param <T> 055 * The type of data item in the stream 056 */ 057public interface Stream<T> extends Iterator<T>, Iterable<T> { 058 059 /** 060 * Apply the given {@link Operation} to each item in the stream. Items are 061 * presented to the {@link Operation} in the order they appear in the 062 * stream. 063 * <p> 064 * Note: for an unbounded stream, this method will never return unless some 065 * form of exception is raised. 066 * 067 * @param op 068 * the {@link Operation} to apply 069 */ 070 public void forEach(Operation<T> op); 071 072 /** 073 * Apply the given {@link Operation} to each item in the stream. Items are 074 * presented to the {@link Operation} in the order they appear in the 075 * stream. The given {@link Predicate} can be used to stop processing of the 076 * stream once some condition is met. 077 * <p> 078 * Note: for an unbounded stream, this method will never return unless some 079 * form of exception is raised or the condition of the 080 * <tt>stopPredicate</tt> is met. 081 * 082 * @param operation 083 * the {@link Operation} to apply 084 * @param stopPredicate 085 * a predicate representing a condition that once met causes 086 * processing to stop 087 */ 088 public void forEach(Operation<T> operation, Predicate<T> stopPredicate); 089 090 /** 091 * Apply the given {@link Operation} to each item in the stream. Items are 092 * presented to the {@link Operation} in the order they appear in the 093 * stream. The given {@link Predicate} can be used to stop processing of the 094 * stream once some condition is met. 095 * <p> 096 * Note: for an unbounded stream, this method will never return unless some 097 * form of exception is raised or the condition of the 098 * <tt>stopPredicate</tt> is met. 099 * 100 * @param operation 101 * the {@link Operation} to apply 102 * @param limit 103 * the number of items to read from the stream 104 * @return 105 * the number of items read 106 */ 107 public int forEach(Operation<T> operation, int limit); 108 109 /** 110 * Apply the given {@link Operation} to each item in the stream, making use 111 * of multiple threads. The order in which operations are performed on the 112 * stream is not guaranteed. 113 * <p> 114 * This method is intended to be a shortcut to calling 115 * {@link Parallel#forEachUnpartitioned(Iterator, Operation)}. 116 * <p> 117 * Note: for an unbounded stream, this method will never return unless some 118 * form of exception is raised. 119 * 120 * @param op 121 * the {@link Operation} to apply 122 */ 123 public void parallelForEach(Operation<T> op); 124 125 /** 126 * Apply the given {@link Operation} to each item in the stream, making use 127 * of multiple threads. The order in which operations are performed on the 128 * stream is not guaranteed. 129 * <p> 130 * This method is intended to be a shortcut to calling 131 * {@link Parallel#forEachUnpartitioned(Iterator, Operation, ThreadPoolExecutor)}. 132 * <p> 133 * Note: for an unbounded stream, this method will never return unless some 134 * form of exception is raised. 135 * 136 * @param op 137 * the {@link Operation} to apply 138 * @param pool 139 * the thread pool. 140 */ 141 public void parallelForEach(Operation<T> op, ThreadPoolExecutor pool); 142 143 /** 144 * Transform the stream by creating a view that consists of only the items 145 * that match the given {@link Predicate}. 146 * 147 * @param filter 148 * the predicate 149 * @return a new stream consisting of the matched items from this stream 150 */ 151 public Stream<T> filter(Predicate<T> filter); 152 153 /** 154 * Transform the stream by creating a new stream that transforms the items 155 * in this stream with the given {@link Function}. 156 * 157 * @param mapper 158 * the function to apply 159 * @return a new stream with transformed items from this stream 160 */ 161 public <R> Stream<R> map(Function<T, R> mapper); 162 163 /** 164 * Transform the stream by creating a new stream that transforms the items 165 * in this stream with the given {@link Function}. 166 * 167 * @param mapper 168 * the function to apply 169 * @return a new stream with transformed items from this stream 170 */ 171 public <R> Stream<R> map(MultiFunction<T, R> mapper); 172 173 /** 174 * Transform the stream using the given function to transform the items in 175 * this stream. 176 * 177 * @param transform 178 * the transform function 179 * @return a new stream with transformed items from this stream 180 */ 181 public <R> Stream<R> transform(Function<Stream<T>, Stream<R>> transform); 182}