QueueBean.java 8.77 KB
package fi.codecrew.moya.beans;

import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

import javax.ejb.Asynchronous;
import javax.ejb.EJB;
import javax.ejb.LocalBean;
import javax.ejb.Lock;
import javax.ejb.LockType;
import javax.ejb.Singleton;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import fi.codecrew.moya.facade.PlaceSlotFacade;
import fi.codecrew.moya.model.EventMap;
import fi.codecrew.moya.model.EventUser;
import fi.codecrew.moya.model.LanEventProperty;
import fi.codecrew.moya.model.LanEventPropertyKey;
import fi.codecrew.moya.model.PlaceSlot;
import fi.codecrew.moya.model.map.MapReservationQueueEntry;

/**
 * Session Bean implementation class QueueBean
 */
@Singleton
@LocalBean
public class QueueBean implements QueueBeanLocal {
	private static final Logger logger = LoggerFactory.getLogger(QueueBean.class);

	/**
	 * Default constructor.
	 */
	public QueueBean() {
		mapqueues = new HashMap<>();
		logger.info("Initialized2 QueueBean, {}", mapqueues);
	}

	private final Map<Integer, MapQueue> mapqueues;
	private int defaultTimeoutMin = 15;
	private int minimumSlotsInQueue = 15;
	private int reservingSize = 5;

	private class MapQueue {
		// private final Set<MapReservationQueueEntry> reserving = new HashSet<>();
		private final Set<EventUser> reserving = Collections.newSetFromMap(new ConcurrentHashMap<EventUser, Boolean>());
		private final LinkedBlockingQueue<EventUser> queue = new LinkedBlockingQueue<>();
		private final ConcurrentMap<EventUser, MapReservationQueueEntry> queEntries = new ConcurrentHashMap<>();

		//		public boolean enterReserving( EventUser user) {
		//			timeoutReserving();
		//
		//			if (reserving.size() < 3 && !queue.isEmpty() && queue.get(0).equals(entry)) {
		//				entry = queue.get(0);
		//				entry.removeFromQueue();
		//			}
		//		}

		private void timeoutEntries() {
			logger.info("Timeouting entries");
			// give 10 seconds mercy ( and give us some time to go through all entries)
			Date now = new Date(System.currentTimeMillis() + 1000 * 15);
			for (EventUser r : reserving) {
				MapReservationQueueEntry entry = queEntries.get(r);
				logger.info("Checking if should remove user from queue {}, timeout {}", r, entry.getReservationTimeout());
				if (entry.getReservationTimeout() == null || now.after(entry.getReservationTimeout())) {
					logger.info("Removing Eventuser {} from reserving queue due to reservationTimeout: {}", r, entry.getReservationTimeout());
					this.remove(r);
				}
			}

			// Set idle time to the past.
			// Idle timeout after 60 seconds
			Date idleTimeout = new Date(System.currentTimeMillis() - 1000 * 60);
			for (EventUser q : queue) {
				MapReservationQueueEntry entry = queEntries.get(q);
				if (entry.getSeenTime() == null) {
					entry.setSeenTime(new Date());
					continue;
				}
				if (idleTimeout.after(entry.getSeenTime())) {
					remove(entry.getUser());
				}

			}
		}

		public boolean isReserving(EventUser e) {

			// Check queue size and add entry to queue
			checkReservingEntry();
			MapReservationQueueEntry que = queEntries.get(e);
			if (que != null) {
				que.setSeenTime(new Date());
			}
			return reserving.contains(e);

		}

		private void checkReservingEntry() {
			if (reserving.size() < reservingSize) {
				synchronized (queue) {
					if (reserving.size() < reservingSize) {
						EventUser queEntry = queue.poll();
						if (queEntry != null) {
							reserving.add(queEntry);
							MapReservationQueueEntry ue = queEntries.get(queEntry);
							ue.setReservationTimeout(new Date(System.currentTimeMillis() + defaultTimeoutMin * 60 * 1000));
						}
					}
				}
			}
		}

		public MapReservationQueueEntry remove(EventUser user)
		{
			if (user == null) {
				return null;
			}
			MapReservationQueueEntry ret = null;

			synchronized (queue) {
				if (reserving.remove(user)) {
					logger.info("Removed user {} from reserving queue", user);
				}
				// There should neve be more than one instance, but make sure
				while (queue.remove(user)) {
					logger.info("Removed user {} from queue");
				}
				ret = queEntries.remove(user);
				checkReservingEntry();
			}
			return ret;
		}

		public MapReservationQueueEntry enter(EventUser user) {
			MapReservationQueueEntry ret = null;
			synchronized (queue) {
				if (!reserving.contains(user) && !queue.contains(user)) {

					ret = new MapReservationQueueEntry();
					queEntries.put(user, ret);
					boolean queStat = queue.offer(user);
					logger.info("User {} not in queue, offer state {}", user, queStat);

					// Check if the user can be put to reservation queue immediately
					checkReservingEntry();

				} else {
					ret = queEntries.get(user);
					logger.info("User {} already in queue. Not entering again {}", user, ret);
				}
			}
			return ret;

		}

		public Integer getPosition(EventUser user) {
			Integer ret = null;

			if (reserving.contains(user)) {
				ret = 0;
				logger.info("User in reserving queue {}", user);
			} else if (queue.contains(user)) {

				ret = 1;
				for (EventUser eu : queue) {
					if (eu.equals(user)) {
						break;
					}
					++ret;
				}
				logger.info("User is in queue {}, position {}", user, ret);
			} else {
				logger.info("Not in queue, while checking position");
			}
			logger.info("Got position {} for user {}", ret, user);
			return ret;
		}

		public boolean isInQueue(EventUser user) {
			return reserving.contains(user) || queue.contains(user);
		}

		public MapReservationQueueEntry getEntry(EventUser user) {
			return queEntries.get(user);
		}
	}

	@EJB
	private EventBeanLocal eventbean;
	private AtomicLong nextReservingTimeoutCheck = new AtomicLong();
	@EJB
	private PlaceSlotFacade slotfacade;

	@Lock(LockType.READ)
	@Override
	public boolean isReserving(EventMap map, EventUser user) {
		// If queue is not enabled, user can always reserve
		if (!isQueueEnabled())
			return true;
		if (map == null || user == null)
		{
			logger.warn("Can not check map {}, user {}", map, user);
		}
		boolean ret = getMapque(map).isReserving(user);

		// Do some housekeeping, but only on each 120

		long now = System.currentTimeMillis();
		long nextTime = nextReservingTimeoutCheck.get();
		// Update next checktime to 120 seconds in to the future, so we should have plenty of time
		// to do the checks we need..
		if (now > nextTime && nextReservingTimeoutCheck.compareAndSet(nextTime, now + 1000 * 120)) {
			logger.info("Launcing reservingTimeout check ");
			checkReservingTimeouts();
			logger.info("Done launching reservingTimeoutCheck");
		}

		return ret;

	}

	@Asynchronous
	private void checkReservingTimeouts() {
		try {
			final long oldTime = nextReservingTimeoutCheck.get();
			for (MapQueue m : mapqueues.values()) {
				m.timeoutEntries();
			}

			// DO housekeeping every 10 seconds.
			nextReservingTimeoutCheck.compareAndSet(oldTime, System.currentTimeMillis() + 1000 * 10);
		} catch (Throwable t) {
			logger.warn("Exception while checking reservingTimeouts", t);
		}
	}

	private MapQueue getMapque(EventMap map) {
		if (map == null) {
			return null;
		}
		MapQueue ret = mapqueues.get(map.getId());

		if (ret == null) {
			logger.info("getMapqueue, {}", mapqueues);
			synchronized (mapqueues) {
				ret = new MapQueue();
				mapqueues.put(map.getId(), ret);
			}
		}
		logger.info("returning queue {} for map {}", ret, map);
		return ret;
	}

	@Lock(LockType.READ)
	@Override
	public Integer getQueuePosition(EventMap map, EventUser user)
	{
		return getMapque(map).getPosition(user);
	}

	@Lock(LockType.READ)
	@Override
	public boolean isQueueEnabled() {
		return eventbean.getPropertyBoolean(LanEventPropertyKey.MAP_QUEUE);
	}

	@Lock(LockType.READ)
	@Override
	public MapReservationQueueEntry remove(EventMap map, EventUser user) {
		MapQueue queue = getMapque(map);
		return queue.remove(user);
	}

	@Override
	@Lock(LockType.READ)
	public MapReservationQueueEntry enterQueue(EventMap map, EventUser user) {
		if (!isQueueEnabled()) {
			return null;
		}
		MapQueue queue = getMapque(map);

		MapReservationQueueEntry ret = null;
		if (queue.isInQueue(user)) {
			logger.info("User {} already in queue");
			ret = queue.getEntry(user);
		} else {
			List<PlaceSlot> slots = slotfacade.findFreePlaceSlotsForProduct(user, null);
			logger.info("User {} not yet in queue. User has {} slots", user, slots.size());
			if (!slots.isEmpty() && slots.size() >= getMinimumSlotsInQueue()) {
				ret = queue.enter(user);
				logger.info("Entered queue: {}", ret);
			}
		}
		return ret;

	}

	@Override
	public int getMinimumSlotsInQueue() {
		return minimumSlotsInQueue;
	}

	@Override
	public void setMinimumSlotsInQueue(int minimumSlotsInQueue) {
		this.minimumSlotsInQueue = minimumSlotsInQueue;
	}

}