001package org.apache.commons.jcs3.auxiliary.lateral.socket.tcp;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.util.ArrayList;
023import java.util.concurrent.ConcurrentHashMap;
024import java.util.concurrent.ConcurrentMap;
025import java.util.concurrent.CopyOnWriteArrayList;
026
027import org.apache.commons.jcs3.auxiliary.lateral.LateralCacheNoWait;
028import org.apache.commons.jcs3.auxiliary.lateral.LateralCacheNoWaitFacade;
029import org.apache.commons.jcs3.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
030import org.apache.commons.jcs3.engine.behavior.ICompositeCacheManager;
031import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
032import org.apache.commons.jcs3.engine.control.CompositeCacheManager;
033import org.apache.commons.jcs3.engine.logging.behavior.ICacheEventLogger;
034import org.apache.commons.jcs3.log.Log;
035import org.apache.commons.jcs3.log.LogManager;
036import org.apache.commons.jcs3.utils.discovery.DiscoveredService;
037import org.apache.commons.jcs3.utils.discovery.behavior.IDiscoveryListener;
038
039/**
040 * This knows how to add and remove discovered services. It observes UDP discovery events.
041 * <p>
042 * We can have one listener per region, or one shared by all regions.
043 */
044public class LateralTCPDiscoveryListener
045    implements IDiscoveryListener
046{
047    /** The log factory */
048    private static final Log log = LogManager.getLog( LateralTCPDiscoveryListener.class );
049
050    /**
051     * Map of no wait facades. these are used to determine which regions are locally configured to
052     * use laterals.
053     */
054    private final ConcurrentMap<String, LateralCacheNoWaitFacade<?, ?>> facades =
055        new ConcurrentHashMap<>();
056
057    /**
058     * List of regions that are configured differently here than on another server. We keep track of
059     * this to limit the amount of info logging.
060     */
061    private final CopyOnWriteArrayList<String> knownDifferentlyConfiguredRegions =
062        new CopyOnWriteArrayList<>();
063
064    /** The name of the cache factory */
065    private final String factoryName;
066
067    /** Reference to the cache manager for auxiliary cache access */
068    private final CompositeCacheManager cacheManager;
069
070    /** Reference to the cache event logger for auxiliary cache creation */
071    private final ICacheEventLogger cacheEventLogger;
072
073    /** Reference to the cache element serializer for auxiliary cache creation */
074    private final IElementSerializer elementSerializer;
075
076    /**
077     * This plugs into the udp discovery system. It will receive add and remove events.
078     * <p>
079     * @param factoryName the name of the related cache factory
080     * @param cacheManager the global cache manager
081     * @deprecated Use constructor with four parameters
082     */
083    @Deprecated
084    protected LateralTCPDiscoveryListener( final String factoryName, final ICompositeCacheManager cacheManager )
085    {
086        this(factoryName, (CompositeCacheManager) cacheManager, null, null);
087    }
088
089    /**
090     * This plugs into the udp discovery system. It will receive add and remove events.
091     * <p>
092     * @param factoryName the name of the related cache factory
093     * @param cacheManager the global cache manager
094     * @param cacheEventLogger Reference to the cache event logger for auxiliary cache creation
095     * @param elementSerializer Reference to the cache element serializer for auxiliary cache
096     * creation
097     * @since 3.1
098     */
099    protected LateralTCPDiscoveryListener( final String factoryName,
100            final CompositeCacheManager cacheManager,
101            final ICacheEventLogger cacheEventLogger,
102            final IElementSerializer elementSerializer)
103    {
104        this.factoryName = factoryName;
105        this.cacheManager = cacheManager;
106        this.cacheEventLogger = cacheEventLogger;
107        this.elementSerializer = elementSerializer;
108    }
109
110    /**
111     * Adds a nowait facade under this cachename. If one already existed, it will be overridden.
112     * <p>
113     * This adds nowaits to a facade for the region name. If the region has no facade, then it is
114     * not configured to use the lateral cache, and no facade will be created.
115     * <p>
116     * @param cacheName - the region name
117     * @param facade - facade (for region) =&gt; multiple lateral clients.
118     * @return true if the facade was not already registered.
119     */
120    public boolean addNoWaitFacade( final String cacheName, final LateralCacheNoWaitFacade<?, ?> facade )
121    {
122        final boolean isNew = !containsNoWaitFacade( cacheName );
123
124        // override or put anew, it doesn't matter
125        facades.put( cacheName, facade );
126        knownDifferentlyConfiguredRegions.remove( cacheName );
127
128        return isNew;
129    }
130
131    /**
132     * Allows us to see if the facade is present.
133     * <p>
134     * @param cacheName - facades are for a region
135     * @return do we contain the no wait. true if so
136     */
137    public boolean containsNoWaitFacade( final String cacheName )
138    {
139        return facades.containsKey( cacheName );
140    }
141
142    /**
143     * Allows us to see if the facade is present and if it has the no wait.
144     * <p>
145     * @param cacheName - facades are for a region
146     * @param noWait - is this no wait in the facade
147     * @return do we contain the no wait. true if so
148     */
149    public <K, V> boolean containsNoWait( final String cacheName, final LateralCacheNoWait<K, V> noWait )
150    {
151        @SuppressWarnings("unchecked") // Need to cast because of common map for all facades
152        final
153        LateralCacheNoWaitFacade<K, V> facade =
154            (LateralCacheNoWaitFacade<K, V>)facades.get( noWait.getCacheName() );
155
156        if ( facade == null )
157        {
158            return false;
159        }
160
161        return facade.containsNoWait( noWait );
162    }
163
164    /**
165     * When a broadcast is received from the UDP Discovery receiver, for each cacheName in the
166     * message, the add no wait will be called here. To add a no wait, the facade is looked up for
167     * this cache name.
168     * <p>
169     * Each region has a facade. The facade contains a list of end points--the other tcp lateral
170     * services.
171     * <p>
172     * @param noWait
173     * @return true if we found the no wait and added it. False if the no wait was not present or if
174     *         we already had it.
175     */
176    protected <K, V> boolean addNoWait( final LateralCacheNoWait<K, V> noWait )
177    {
178        @SuppressWarnings("unchecked") // Need to cast because of common map for all facades
179        final
180        LateralCacheNoWaitFacade<K, V> facade =
181            (LateralCacheNoWaitFacade<K, V>)facades.get( noWait.getCacheName() );
182        log.debug( "addNoWait > Got facade for {0} = {1}", noWait.getCacheName(), facade );
183
184        return addNoWait(noWait, facade);
185    }
186
187    /**
188     * When a broadcast is received from the UDP Discovery receiver, for each cacheName in the
189     * message, the add no wait will be called here.
190     * <p>
191     * @param noWait the no wait
192     * @param facade the related facade
193     * @return true if we found the no wait and added it. False if the no wait was not present or if
194     *         we already had it.
195     * @since 3.1
196     */
197    protected <K, V> boolean addNoWait(final LateralCacheNoWait<K, V> noWait,
198            final LateralCacheNoWaitFacade<K, V> facade)
199    {
200        if ( facade != null )
201        {
202            final boolean isNew = facade.addNoWait( noWait );
203            log.debug( "Called addNoWait, isNew = {0}", isNew );
204            return isNew;
205        }
206        if ( knownDifferentlyConfiguredRegions.addIfAbsent( noWait.getCacheName() ) )
207        {
208            log.info( "addNoWait > Different nodes are configured differently "
209                    + "or region [{0}] is not yet used on this side.",
210                    noWait::getCacheName);
211        }
212        return false;
213    }
214
215    /**
216     * Look up the facade for the name. If it doesn't exist, then the region is not configured for
217     * use with the lateral cache. If it is present, remove the item from the no wait list.
218     * <p>
219     * @param noWait
220     * @return true if we found the no wait and removed it. False if the no wait was not present.
221     */
222    protected <K, V> boolean removeNoWait( final LateralCacheNoWait<K, V> noWait )
223    {
224        @SuppressWarnings("unchecked") // Need to cast because of common map for all facades
225        final
226        LateralCacheNoWaitFacade<K, V> facade =
227            (LateralCacheNoWaitFacade<K, V>)facades.get( noWait.getCacheName() );
228        log.debug( "removeNoWait > Got facade for {0} = {1}", noWait.getCacheName(), facade);
229
230        return removeNoWait(facade, noWait.getCacheName(), noWait.getIdentityKey());
231    }
232
233    /**
234     * Remove the item from the no wait list.
235     * <p>
236     * @param facade
237     * @param cacheName
238     * @param tcpServer
239     * @return true if we found the no wait and removed it. False if the no wait was not present.
240     * @since 3.1
241     */
242    protected <K, V> boolean removeNoWait(final LateralCacheNoWaitFacade<K, V> facade,
243            final String cacheName, final String tcpServer)
244    {
245        if ( facade != null )
246        {
247            final boolean removed = facade.removeNoWait(tcpServer);
248            log.debug( "Called removeNoWait, removed {0}", removed );
249            return removed;
250        }
251        if (knownDifferentlyConfiguredRegions.addIfAbsent(cacheName))
252        {
253            log.info( "addNoWait > Different nodes are configured differently "
254                    + "or region [{0}] is not yet used on this side.",
255                    cacheName);
256        }
257        return false;
258    }
259
260    /**
261     * Creates the lateral cache if needed.
262     * <p>
263     * We could go to the composite cache manager and get the cache for the region. This would
264     * force a full configuration of the region. One advantage of this would be that the creation of
265     * the later would go through the factory, which would add the item to the no wait list. But we
266     * don't want to do this. This would force this client to have all the regions as the other.
267     * This might not be desired. We don't want to send or receive for a region here that is either
268     * not used or not configured to use the lateral.
269     * <p>
270     * Right now, I'm afraid that the region will get puts if another instance has the region
271     * configured to use the lateral and our address is configured. This might be a bug, but it
272     * shouldn't happen with discovery.
273     * <p>
274     * @param service
275     */
276    @Override
277    public void addDiscoveredService( final DiscoveredService service )
278    {
279        // get a cache and add it to the no waits
280        // the add method should not add the same.
281        // we need the listener port from the original config.
282        final ArrayList<String> regions = service.getCacheNames();
283        final String serverAndPort = service.getServiceAddress() + ":" + service.getServicePort();
284
285        if ( regions != null )
286        {
287            // for each region get the cache
288            for (final String cacheName : regions)
289            {
290                final LateralCacheNoWaitFacade<?, ?> facade = facades.get(cacheName);
291                log.debug( "Got cache facade {0}", facade );
292
293                // add this to the nowaits for this cachename
294                if (facade != null)
295                {
296                    // skip caches already there
297                    if (facade.containsNoWait(serverAndPort))
298                    {
299                        continue;
300                    }
301
302                    final ITCPLateralCacheAttributes lca =
303                            (ITCPLateralCacheAttributes) facade.getAuxiliaryCacheAttributes().clone();
304                    lca.setTcpServer(serverAndPort);
305
306                    LateralTCPCacheFactory factory =
307                            (LateralTCPCacheFactory) cacheManager.registryFacGet(factoryName);
308
309                    LateralCacheNoWait<?, ?> noWait =
310                            factory.createCacheNoWait(lca, cacheEventLogger, elementSerializer);
311                    factory.monitorCache(noWait);
312
313                    if (addNoWait(noWait))
314                    {
315                        log.debug("Added NoWait for cacheName [{0}] at {1}", cacheName, serverAndPort);
316                    }
317                }
318            }
319        }
320        else
321        {
322            log.warn( "No cache names found in message {0}", service );
323        }
324    }
325
326    /**
327     * Removes the lateral cache.
328     * <p>
329     * We need to tell the manager that this instance is bad, so it will reconnect the sender if it
330     * comes back.
331     * <p>
332     * @param service
333     */
334    @Override
335    public void removeDiscoveredService( final DiscoveredService service )
336    {
337        // get a cache and add it to the no waits
338        // the add method should not add the same.
339        // we need the listener port from the original config.
340        final ArrayList<String> regions = service.getCacheNames();
341        final String serverAndPort = service.getServiceAddress() + ":" + service.getServicePort();
342
343        if ( regions != null )
344        {
345            // for each region get the cache
346            for (final String cacheName : regions)
347            {
348                final LateralCacheNoWaitFacade<?, ?> facade = facades.get(cacheName);
349                log.debug( "Got cache facade {0}", facade );
350
351                // remove this from the nowaits for this cachename
352                if (facade != null && removeNoWait(facade, cacheName, serverAndPort))
353                {
354                    log.debug("Removed NoWait for cacheName [{0}] at {1}", cacheName, serverAndPort);
355                }
356            }
357        }
358        else
359        {
360            log.warn( "No cache names found in message {0}", service );
361        }
362    }
363}