001    /*
002     *  Licensed to the Apache Software Foundation (ASF) under one or more
003     *  contributor license agreements.  See the NOTICE file distributed with
004     *  this work for additional information regarding copyright ownership.
005     *  The ASF licenses this file to You under the Apache License, Version 2.0
006     *  (the "License"); you may not use this file except in compliance with
007     *  the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     *  Unless required by applicable law or agreed to in writing, software
012     *  distributed under the License is distributed on an "AS IS" BASIS,
013     *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     *  See the License for the specific language governing permissions and
015     *  limitations under the License.
016     */
017    package org.apache.commons.collections.buffer;
018    
019    import java.io.PrintWriter;
020    import java.io.StringWriter;
021    import java.util.Collection;
022    
023    import org.apache.commons.collections.Buffer;
024    import org.apache.commons.collections.BufferUnderflowException;
025    
026    /**
027     * Decorates another <code>Buffer</code> to make {@link #get()} and
028     * {@link #remove()} block when the <code>Buffer</code> is empty.
029     * <p>
030     * If either <code>get</code> or <code>remove</code> is called on an empty
031     * <code>Buffer</code>, the calling thread waits for notification that
032     * an <code>add</code> or <code>addAll</code> operation has completed.
033     * <p>
034     * When one or more entries are added to an empty <code>Buffer</code>,
035     * all threads blocked in <code>get</code> or <code>remove</code> are notified.
036     * There is no guarantee that concurrent blocked <code>get</code> or
037     * <code>remove</code> requests will be "unblocked" and receive data in the
038     * order that they arrive.
039     * <p>
040     * This class is Serializable from Commons Collections 3.1.
041     * This class contains an extra field in 3.2, however the serialization
042     * specification will handle this gracefully.
043     *
044     * @author Stephen Colebourne
045     * @author Janek Bogucki
046     * @author Phil Steitz
047     * @author James Carman
048     * @version $Revision: 646777 $ $Date: 2008-04-10 13:33:15 +0100 (Thu, 10 Apr 2008) $
049     * @since Commons Collections 3.0
050     */
051    public class BlockingBuffer extends SynchronizedBuffer {
052    
053        /** Serialization version. */
054        private static final long serialVersionUID = 1719328905017860541L;
055        /** The timeout value in milliseconds. */
056        private final long timeout;
057    
058        /**
059         * Factory method to create a blocking buffer.
060         *
061         * @param buffer the buffer to decorate, must not be null
062         * @return a new blocking Buffer
063         * @throws IllegalArgumentException if buffer is null
064         */
065        public static Buffer decorate(Buffer buffer) {
066            return new BlockingBuffer(buffer);
067        }
068    
069        /**
070         * Factory method to create a blocking buffer with a timeout value.
071         *
072         * @param buffer  the buffer to decorate, must not be null
073         * @param timeoutMillis  the timeout value in milliseconds, zero or less for no timeout
074         * @return a new blocking buffer
075         * @throws IllegalArgumentException if the buffer is null
076         * @since Commons Collections 3.2
077         */
078        public static Buffer decorate(Buffer buffer, long timeoutMillis) {
079            return new BlockingBuffer(buffer, timeoutMillis);
080        }
081    
082        //-----------------------------------------------------------------------    
083        /**
084         * Constructor that wraps (not copies).
085         *
086         * @param buffer the buffer to decorate, must not be null
087         * @throws IllegalArgumentException if the buffer is null
088         */
089        protected BlockingBuffer(Buffer buffer) {
090            super(buffer);
091            this.timeout = 0;
092        }
093    
094        /**
095         * Constructor that wraps (not copies).
096         *
097         * @param buffer  the buffer to decorate, must not be null
098         * @param timeoutMillis  the timeout value in milliseconds, zero or less for no timeout
099         * @throws IllegalArgumentException if the buffer is null
100         * @since Commons Collections 3.2
101         */
102        protected BlockingBuffer(Buffer buffer, long timeoutMillis) {
103            super(buffer);
104            this.timeout = (timeoutMillis < 0 ? 0 : timeoutMillis);
105        }
106    
107        //-----------------------------------------------------------------------
108        public boolean add(Object o) {
109            synchronized (lock) {
110                boolean result = collection.add(o);
111                lock.notifyAll();
112                return result;
113            }
114        }
115    
116        public boolean addAll(Collection c) {
117            synchronized (lock) {
118                boolean result = collection.addAll(c);
119                lock.notifyAll();
120                return result;
121            }
122        }
123    
124        /**
125         * Gets the next value from the buffer, waiting until an object is
126         * added if the buffer is empty. This method uses the default timeout
127         * set in the constructor.
128         *
129         * @throws BufferUnderflowException if an interrupt is received
130         */
131        public Object get() {
132            synchronized (lock) {
133                while (collection.isEmpty()) {
134                    try {
135                        if (timeout <= 0) {
136                            lock.wait();
137                        } else {
138                            return get(timeout);
139                        }
140                    } catch (InterruptedException e) {
141                        PrintWriter out = new PrintWriter(new StringWriter());
142                        e.printStackTrace(out);
143                        throw new BufferUnderflowException("Caused by InterruptedException: " + out.toString());
144                    }
145                }
146                return getBuffer().get();
147            }
148        }
149    
150        /**
151         * Gets the next value from the buffer, waiting until an object is
152         * added for up to the specified timeout value if the buffer is empty.
153         *
154         * @param timeout  the timeout value in milliseconds
155         * @throws BufferUnderflowException if an interrupt is received
156         * @throws BufferUnderflowException if the timeout expires
157         * @since Commons Collections 3.2
158         */
159        public Object get(final long timeout) {
160            synchronized (lock) {
161                final long expiration = System.currentTimeMillis() + timeout;
162                long timeLeft = expiration - System.currentTimeMillis();
163                while (timeLeft > 0 && collection.isEmpty()) {
164                    try {
165                        lock.wait(timeLeft);
166                        timeLeft = expiration - System.currentTimeMillis();
167                    } catch(InterruptedException e) {
168                        PrintWriter out = new PrintWriter(new StringWriter());
169                        e.printStackTrace(out);
170                        throw new BufferUnderflowException("Caused by InterruptedException: " + out.toString());
171                    }
172                }
173                if (collection.isEmpty()) {
174                    throw new BufferUnderflowException("Timeout expired");
175                }
176                return getBuffer().get();
177            }
178        }
179    
180        /**
181         * Removes the next value from the buffer, waiting until an object is
182         * added if the buffer is empty. This method uses the default timeout
183         * set in the constructor.
184         *
185         * @throws BufferUnderflowException if an interrupt is received
186         */
187        public Object remove() {
188            synchronized (lock) {
189                while (collection.isEmpty()) {
190                    try {
191                        if (timeout <= 0) {
192                            lock.wait();
193                        } else {
194                            return remove(timeout);
195                        }
196                    } catch (InterruptedException e) {
197                        PrintWriter out = new PrintWriter(new StringWriter());
198                        e.printStackTrace(out);
199                        throw new BufferUnderflowException("Caused by InterruptedException: " + out.toString());
200                    }
201                }
202                return getBuffer().remove();
203            }
204        }
205    
206        /**
207         * Removes the next value from the buffer, waiting until an object is
208         * added for up to the specified timeout value if the buffer is empty.
209         *
210         * @param timeout  the timeout value in milliseconds
211         * @throws BufferUnderflowException if an interrupt is received
212         * @throws BufferUnderflowException if the timeout expires
213         * @since Commons Collections 3.2
214         */
215        public Object remove(final long timeout) {
216            synchronized (lock) {
217                final long expiration = System.currentTimeMillis() + timeout;
218                long timeLeft = expiration - System.currentTimeMillis();
219                while (timeLeft > 0 && collection.isEmpty()) {
220                    try {
221                        lock.wait(timeLeft);
222                        timeLeft = expiration - System.currentTimeMillis();
223                    } catch(InterruptedException e) {
224                        PrintWriter out = new PrintWriter(new StringWriter());
225                        e.printStackTrace(out);
226                        throw new BufferUnderflowException("Caused by InterruptedException: " + out.toString());
227                    }
228                }
229                if (collection.isEmpty()) {
230                    throw new BufferUnderflowException("Timeout expired");
231                }
232                return getBuffer().remove();
233            }
234        }
235    
236    }