001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.commons.collections.buffer;
018
019 import java.io.PrintWriter;
020 import java.io.StringWriter;
021 import java.util.Collection;
022
023 import org.apache.commons.collections.Buffer;
024 import org.apache.commons.collections.BufferUnderflowException;
025
026 /**
027 * Decorates another <code>Buffer</code> to make {@link #get()} and
028 * {@link #remove()} block when the <code>Buffer</code> is empty.
029 * <p>
030 * If either <code>get</code> or <code>remove</code> is called on an empty
031 * <code>Buffer</code>, the calling thread waits for notification that
032 * an <code>add</code> or <code>addAll</code> operation has completed.
033 * <p>
034 * When one or more entries are added to an empty <code>Buffer</code>,
035 * all threads blocked in <code>get</code> or <code>remove</code> are notified.
036 * There is no guarantee that concurrent blocked <code>get</code> or
037 * <code>remove</code> requests will be "unblocked" and receive data in the
038 * order that they arrive.
039 * <p>
040 * This class is Serializable from Commons Collections 3.1.
041 * This class contains an extra field in 3.2, however the serialization
042 * specification will handle this gracefully.
043 *
044 * @author Stephen Colebourne
045 * @author Janek Bogucki
046 * @author Phil Steitz
047 * @author James Carman
048 * @version $Revision: 646777 $ $Date: 2008-04-10 13:33:15 +0100 (Thu, 10 Apr 2008) $
049 * @since Commons Collections 3.0
050 */
051 public class BlockingBuffer extends SynchronizedBuffer {
052
053 /** Serialization version. */
054 private static final long serialVersionUID = 1719328905017860541L;
055 /** The timeout value in milliseconds. */
056 private final long timeout;
057
058 /**
059 * Factory method to create a blocking buffer.
060 *
061 * @param buffer the buffer to decorate, must not be null
062 * @return a new blocking Buffer
063 * @throws IllegalArgumentException if buffer is null
064 */
065 public static Buffer decorate(Buffer buffer) {
066 return new BlockingBuffer(buffer);
067 }
068
069 /**
070 * Factory method to create a blocking buffer with a timeout value.
071 *
072 * @param buffer the buffer to decorate, must not be null
073 * @param timeoutMillis the timeout value in milliseconds, zero or less for no timeout
074 * @return a new blocking buffer
075 * @throws IllegalArgumentException if the buffer is null
076 * @since Commons Collections 3.2
077 */
078 public static Buffer decorate(Buffer buffer, long timeoutMillis) {
079 return new BlockingBuffer(buffer, timeoutMillis);
080 }
081
082 //-----------------------------------------------------------------------
083 /**
084 * Constructor that wraps (not copies).
085 *
086 * @param buffer the buffer to decorate, must not be null
087 * @throws IllegalArgumentException if the buffer is null
088 */
089 protected BlockingBuffer(Buffer buffer) {
090 super(buffer);
091 this.timeout = 0;
092 }
093
094 /**
095 * Constructor that wraps (not copies).
096 *
097 * @param buffer the buffer to decorate, must not be null
098 * @param timeoutMillis the timeout value in milliseconds, zero or less for no timeout
099 * @throws IllegalArgumentException if the buffer is null
100 * @since Commons Collections 3.2
101 */
102 protected BlockingBuffer(Buffer buffer, long timeoutMillis) {
103 super(buffer);
104 this.timeout = (timeoutMillis < 0 ? 0 : timeoutMillis);
105 }
106
107 //-----------------------------------------------------------------------
108 public boolean add(Object o) {
109 synchronized (lock) {
110 boolean result = collection.add(o);
111 lock.notifyAll();
112 return result;
113 }
114 }
115
116 public boolean addAll(Collection c) {
117 synchronized (lock) {
118 boolean result = collection.addAll(c);
119 lock.notifyAll();
120 return result;
121 }
122 }
123
124 /**
125 * Gets the next value from the buffer, waiting until an object is
126 * added if the buffer is empty. This method uses the default timeout
127 * set in the constructor.
128 *
129 * @throws BufferUnderflowException if an interrupt is received
130 */
131 public Object get() {
132 synchronized (lock) {
133 while (collection.isEmpty()) {
134 try {
135 if (timeout <= 0) {
136 lock.wait();
137 } else {
138 return get(timeout);
139 }
140 } catch (InterruptedException e) {
141 PrintWriter out = new PrintWriter(new StringWriter());
142 e.printStackTrace(out);
143 throw new BufferUnderflowException("Caused by InterruptedException: " + out.toString());
144 }
145 }
146 return getBuffer().get();
147 }
148 }
149
150 /**
151 * Gets the next value from the buffer, waiting until an object is
152 * added for up to the specified timeout value if the buffer is empty.
153 *
154 * @param timeout the timeout value in milliseconds
155 * @throws BufferUnderflowException if an interrupt is received
156 * @throws BufferUnderflowException if the timeout expires
157 * @since Commons Collections 3.2
158 */
159 public Object get(final long timeout) {
160 synchronized (lock) {
161 final long expiration = System.currentTimeMillis() + timeout;
162 long timeLeft = expiration - System.currentTimeMillis();
163 while (timeLeft > 0 && collection.isEmpty()) {
164 try {
165 lock.wait(timeLeft);
166 timeLeft = expiration - System.currentTimeMillis();
167 } catch(InterruptedException e) {
168 PrintWriter out = new PrintWriter(new StringWriter());
169 e.printStackTrace(out);
170 throw new BufferUnderflowException("Caused by InterruptedException: " + out.toString());
171 }
172 }
173 if (collection.isEmpty()) {
174 throw new BufferUnderflowException("Timeout expired");
175 }
176 return getBuffer().get();
177 }
178 }
179
180 /**
181 * Removes the next value from the buffer, waiting until an object is
182 * added if the buffer is empty. This method uses the default timeout
183 * set in the constructor.
184 *
185 * @throws BufferUnderflowException if an interrupt is received
186 */
187 public Object remove() {
188 synchronized (lock) {
189 while (collection.isEmpty()) {
190 try {
191 if (timeout <= 0) {
192 lock.wait();
193 } else {
194 return remove(timeout);
195 }
196 } catch (InterruptedException e) {
197 PrintWriter out = new PrintWriter(new StringWriter());
198 e.printStackTrace(out);
199 throw new BufferUnderflowException("Caused by InterruptedException: " + out.toString());
200 }
201 }
202 return getBuffer().remove();
203 }
204 }
205
206 /**
207 * Removes the next value from the buffer, waiting until an object is
208 * added for up to the specified timeout value if the buffer is empty.
209 *
210 * @param timeout the timeout value in milliseconds
211 * @throws BufferUnderflowException if an interrupt is received
212 * @throws BufferUnderflowException if the timeout expires
213 * @since Commons Collections 3.2
214 */
215 public Object remove(final long timeout) {
216 synchronized (lock) {
217 final long expiration = System.currentTimeMillis() + timeout;
218 long timeLeft = expiration - System.currentTimeMillis();
219 while (timeLeft > 0 && collection.isEmpty()) {
220 try {
221 lock.wait(timeLeft);
222 timeLeft = expiration - System.currentTimeMillis();
223 } catch(InterruptedException e) {
224 PrintWriter out = new PrintWriter(new StringWriter());
225 e.printStackTrace(out);
226 throw new BufferUnderflowException("Caused by InterruptedException: " + out.toString());
227 }
228 }
229 if (collection.isEmpty()) {
230 throw new BufferUnderflowException("Timeout expired");
231 }
232 return getBuffer().remove();
233 }
234 }
235
236 }