View Javadoc

1   /**
2    * Copyright (c) 2012, University of Konstanz, Distributed Systems Group All rights reserved.
3    * 
4    * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
5    * following conditions are met: * Redistributions of source code must retain the above copyright notice, this list of
6    * conditions and the following disclaimer. * Redistributions in binary form must reproduce the above copyright notice,
7    * this list of conditions and the following disclaimer in the documentation and/or other materials provided with the
8    * distribution. * Neither the name of the University of Konstanz nor the names of its contributors may be used to
9    * endorse or promote products derived from this software without specific prior written permission.
10   * 
11   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
12   * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
13   * DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
14   * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
15   * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
16   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
17   * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
18   */
19  /**
20   * 
21   */
22  
23  package org.jscsi.initiator.connection;
24  
25  
26  import java.io.IOException;
27  import java.net.InetSocketAddress;
28  import java.nio.ByteBuffer;
29  import java.util.concurrent.ConcurrentHashMap;
30  import java.util.concurrent.ExecutionException;
31  import java.util.concurrent.ExecutorService;
32  import java.util.concurrent.Future;
33  import java.util.concurrent.LinkedBlockingQueue;
34  
35  import org.jscsi.exception.NoSuchConnectionException;
36  import org.jscsi.exception.TaskExecutionException;
37  import org.jscsi.initiator.Configuration;
38  import org.jscsi.initiator.LinkFactory;
39  import org.jscsi.initiator.connection.phase.IPhase;
40  import org.jscsi.initiator.connection.phase.SecurityNegotiationPhase;
41  import org.jscsi.initiator.connection.state.LoginRequestState;
42  import org.jscsi.initiator.taskbalancer.AbstractTaskBalancer;
43  import org.jscsi.initiator.taskbalancer.SimpleTaskBalancer;
44  import org.jscsi.parser.datasegment.OperationalTextKey;
45  import org.jscsi.parser.datasegment.SettingsMap;
46  import org.jscsi.parser.login.LoginStage;
47  import org.jscsi.utils.SerialArithmeticNumber;
48  import org.slf4j.Logger;
49  import org.slf4j.LoggerFactory;
50  
51  
52  /**
53   * <h1>Session</h1>
54   * <p/>
55   * A session or Initiator Target Nexus is a directed communication from an iSCSI Initiator to an iSCSI Target. Each
56   * session can contain several connections. This allows a better usage of bandwidth and decreases latency times. The
57   * Abstract Class is used to implement serveral single- and multithreaded variants of Sessions
58   * 
59   * @author Volker Wildi, University of Konstanz
60   * @author Patrice Matthias Brend'amour, University of Kontanz
61   * @author Sebastian Graf, University of Kontanz
62   */
63  public final class Session {
64  
65      /** The unique name of the connected iSCSI Target. */
66      protected final String targetName;
67  
68      /** The unique name of the connected iSCSI Target. */
69      protected final InetSocketAddress inetSocketAddress;
70  
71      /** The maximum number of connections, which are allowed in this session. */
72      private int maxConnections;
73  
74      /** The index of the next used connection ID. */
75      protected short nextFreeConnectionID;
76  
77      /** The session is in this phase. */
78      protected IPhase phase;
79  
80      /**
81       * This instance contains the informations about the capacity of the connected target.
82       */
83      protected final TargetCapacityInformations capacityInformations;
84  
85      /** A List object with all open connections. */
86      protected final LinkedBlockingQueue<Connection> connections;
87  
88      /** The Command Sequence Number of this session. */
89      protected final SerialArithmeticNumber commandSequenceNumber;
90  
91      /** The Maximum Command Sequence Number of this session. */
92      protected final SerialArithmeticNumber maximumCommandSequenceNumber;
93  
94      /**
95       * The initiator uses this Initiator Task Tag to relate data to the appropriate command. And the target uses this
96       * tag to correlate the data to the appropriate command that it received earlier.
97       */
98      protected final SerialArithmeticNumber initiatorTaskTag;
99  
100     /**
101      * Flag to indicate, if the login phase of this session is successfully completed. This flag is also used for the
102      * protection of a reseting of <code>targetSessionIdentifyingHandle</code>.
103      */
104     protected boolean tsihChanged;
105 
106     /** The Target Session Identifying Handle. */
107     protected short targetSessionIdentifyingHandle;
108 
109     /** The Logger interface. */
110     private static final Logger LOGGER = LoggerFactory.getLogger(Session.class);
111 
112     /** The <code>Configuration</code> instance for this session. */
113     protected final Configuration configuration;
114 
115     /** The <code>LinkFactory</code> instance for this session. */
116     protected final LinkFactory factory;
117 
118     /** Executor to work with all task to be commited. */
119     private final ExecutorService executor;
120 
121     /** Contains all queues, which are till now not successfully finished. */
122     // FIXME: Support me!
123     private final ConcurrentHashMap<ITask , Connection> outstandingTasks;
124 
125     /**
126      * Handles the load balancing of the task distribution to the opened connections.
127      */
128     protected final AbstractTaskBalancer taskBalancer;
129 
130     // --------------------------------------------------------------------------
131     // --------------------------------------------------------------------------
132 
133     /**
134      * Constructor to create a new, empty <code>AbsSession</code> object with a maximum number of allowed connections to
135      * a given iSCSI Target. This is the abstract definition for Session implementations
136      * 
137      * @param linkFactory The LinkFactory which called the Constructor
138      * @param initConfiguration The configuration to use within this session.
139      * @param initTargetName The name of the iSCSI Target.
140      * @param inetAddress The <code>InetSocketAddress</code> of the leading connection of this session.
141      * @param initExecutor The <code>ExecutorService</code> for the Connection Threads
142      * @throws Exception if anything happens
143      */
144 
145     public Session (final LinkFactory linkFactory, final Configuration initConfiguration, final String initTargetName, final InetSocketAddress inetAddress, final ExecutorService initExecutor) throws Exception {
146 
147         maxConnections = Integer.parseInt(initConfiguration.getSessionSetting(initTargetName, OperationalTextKey.MAX_CONNECTIONS));
148         factory = linkFactory;
149         configuration = initConfiguration;
150         commandSequenceNumber = new SerialArithmeticNumber();
151         maximumCommandSequenceNumber = new SerialArithmeticNumber(1);
152         nextFreeConnectionID = 1;
153         inetSocketAddress = inetAddress;
154         initiatorTaskTag = new SerialArithmeticNumber(1);
155         targetName = initTargetName;
156         phase = new SecurityNegotiationPhase();
157         capacityInformations = new TargetCapacityInformations();
158         connections = new LinkedBlockingQueue<Connection>(maxConnections);
159         executor = initExecutor;
160         taskBalancer = new SimpleTaskBalancer(connections);
161         outstandingTasks = new ConcurrentHashMap<ITask , Connection>();
162 
163         // Add the leading connection
164         addNewConnection();
165 
166         /*
167          * We have to check whether the MaxConnection setting in our Configuration is correct. There might be a wrong
168          * setting for a target e.g. the target only supports one connection but we think it can handle two.
169          */
170         maxConnections = Integer.parseInt(configuration.getSessionSetting(targetName, OperationalTextKey.MAX_CONNECTIONS));
171         int targetMaxC = connections.peek().getSettingAsInt(OperationalTextKey.MAX_CONNECTIONS);
172         if (targetMaxC < maxConnections) {
173             maxConnections = targetMaxC;
174         }
175 
176         // Add more Connections
177         // TODO Do something more intelligent here. Always adding the maximum
178         // isn't
179         // always a good idea
180         addConnections(maxConnections - 1);
181 
182     }
183 
184     /**
185      * Returns the Target Session Identifying Handle (TSID) of this <code>Session</code> object.
186      * 
187      * @return The current Target Session Identifying Handle (TSIH)
188      */
189     public final short getTargetSessionIdentifyingHandle () {
190 
191         return targetSessionIdentifyingHandle;
192     }
193 
194     /**
195      * Sets the Target Session Identifying Handle (TSIH) to the given value. This TSIH is specified at the Login Phase
196      * by the target in a new session. So, it can only set one time.
197      * 
198      * @param tsih The new Target Session Identifying Handle.
199      */
200     public final void setTargetSessionIdentifyingHandle (final short tsih) {
201 
202         if (!tsihChanged) {
203             targetSessionIdentifyingHandle = tsih;
204             tsihChanged = true;
205         }
206     }
207 
208     // --------------------------------------------------------------------------
209     // --------------------------------------------------------------------------
210 
211     /**
212      * Returns the Command Sequence Number of this session.
213      * 
214      * @return The current Command Sequence Number.
215      */
216     public final int getCommandSequenceNumber () {
217 
218         return commandSequenceNumber.getValue();
219     }
220 
221     /**
222      * Sets the Maximum Command Sequence Number to a new value.
223      * 
224      * @param newMaximumCommandSequenceNumber The new Maximum Command Sequence Number.
225      */
226     public final void setMaximumCommandSequenceNumber (final int newMaximumCommandSequenceNumber) {
227 
228         maximumCommandSequenceNumber.setValue(newMaximumCommandSequenceNumber);
229     }
230 
231     /**
232      * Returns the Maximum Command Sequence Number of this session.
233      * 
234      * @return The current Maximum Command Sequence Number.
235      */
236     public final SerialArithmeticNumber getMaximumCommandSequenceNumber () {
237 
238         return maximumCommandSequenceNumber;
239     }
240 
241     /**
242      * Returns the Initiator Task Tag of this session.
243      * 
244      * @return The Initiator Task Tag.
245      */
246     public final int getInitiatorTaskTag () {
247 
248         return initiatorTaskTag.getValue();
249     }
250 
251     /**
252      * Increments the Initiator Task Tag as defined in RFC1982 where <code>SERIAL_BITS = 32</code>.
253      */
254     public final void incrementInitiatorTaskTag () {
255 
256         initiatorTaskTag.increment();
257     }
258 
259     /**
260      * Has the iSCSI Target enough resources to accept more incoming PDU?
261      * 
262      * @return <code>true</code>, if the iSCSI Target has enough resources to accept more incoming PDUs. Else
263      *         <code>false</code> and hold out for sending.
264      */
265     public final boolean hasTargetMoreResources () {
266 
267         return maximumCommandSequenceNumber.compareTo(commandSequenceNumber.getValue()) > 0;
268     }
269 
270     // --------------------------------------------------------------------------
271     // --------------------------------------------------------------------------
272 
273     /**
274      * Returns the name of the iSCSI Target of this session.
275      * 
276      * @return The name of the iSCSI Target.
277      */
278     public final String getTargetName () {
279 
280         return targetName;
281     }
282 
283     // --------------------------------------------------------------------------
284     // --------------------------------------------------------------------------
285 
286     /**
287      * Adds a number of new connections to this session.
288      * 
289      * @param max The number of Connections to open.
290      * @throws Exception if any error occurs.
291      */
292     public final void addConnections (final int max) throws Exception {
293 
294         if (connections.size() < maxConnections) {
295             for (int i = 1; i < max; i++) {
296                 addNewConnection();
297             }
298         }
299     }
300 
301     /**
302      * Adds a new connection to this session with the next free connection ID (if the maximum number is not reached).
303      * 
304      * @return The connection ID of the newly created connection.
305      * @throws Exception if any error occurs.
306      */
307     protected final short addNewConnection () throws Exception {
308 
309         if (connections.size() < maxConnections) {
310 
311             final Connection connection = factory.getConnection(this, configuration, inetSocketAddress, nextFreeConnectionID);
312             connection.nextState(new LoginRequestState(connection, LoginStage.FULL_FEATURE_PHASE));
313             // login phase successful, so we can add a new connection
314             connections.add(connection);
315 
316             // only needed on the leading login connection
317             if (connections.size() == 1) {
318                 phase.getCapacity(this, capacityInformations);
319 
320                 if (connection.getSettingAsInt(OperationalTextKey.MAX_CONNECTIONS) > 1) {
321                     phase.login(this);
322                 }
323             }
324 
325             return nextFreeConnectionID++;
326         } else {
327             LOGGER.warn("Unused new connection -> ignored!");
328             return nextFreeConnectionID;
329         }
330 
331     }
332 
333     /**
334      * Updates the MaxConnection setting, so that it grows/shrinks the Connectionlist.
335      * 
336      * @param max The maximum number of concurrent <code>Connections</code> to a target.
337      */
338     public void updateMaxConnections (final int max) {
339 
340         try {
341             Connection conn = taskBalancer.getConnection();
342             int update = 0;
343             int targetMaxC = connections.peek().getSettingAsInt(OperationalTextKey.MAX_CONNECTIONS);
344             if (targetMaxC <= max) {
345                 if (targetMaxC > maxConnections) {
346                     update = targetMaxC - maxConnections;
347                     maxConnections = targetMaxC;
348                 }
349             } else {
350                 if (max >= maxConnections) {
351                     update = max - maxConnections;
352                     maxConnections = max;
353                 }
354             }
355 
356             SettingsMap sm = new SettingsMap();
357             sm.add(OperationalTextKey.MAX_CONNECTIONS, String.valueOf(maxConnections));
358             conn.update(sm);
359             taskBalancer.releaseConnection(conn);
360 
361             if (update > 0) {
362                 addConnections(update);
363             } else {
364                 for (int i = -1; i >= update; i--) {
365                     taskBalancer.getConnection().close();
366                 }
367             }
368 
369         } catch (Exception e) {
370             // DO Nothing
371         }
372 
373     }
374 
375     /**
376      * Returns the next free <code>Connection</code> object of this <code>Session</code> object.
377      * 
378      * @return The connection to use for the next task.
379      * @throws NoSuchConnectionException If there is no such connection.
380      */
381     public final Connection getNextFreeConnection () throws NoSuchConnectionException {
382 
383         return taskBalancer.getConnection();
384     }
385 
386     /**
387      * Increments the Command Sequence Number as defined in RFC1982, where <code>SERIAL_BITS = 32</code>.
388      */
389     public final void incrementCommandSequenceNumber () {
390 
391         commandSequenceNumber.increment();
392     }
393 
394     // --------------------------------------------------------------------------
395     // --------------------------------------------------------------------------
396 
397     /**
398      * Closes this session instances with all opened connections.
399      * 
400      * @throws IOException if an I/O error occurs.
401      */
402     public final void close () throws IOException {
403 
404         LOGGER.info("Closing was requested.");
405 
406         for (Connection c : connections) {
407             c.close();
408         }
409 
410         connections.clear();
411 
412         // stop session task thread
413         factory.closedSession(this);
414         executor.shutdown();
415     }
416 
417     /**
418      * Returns the used block size of the connected iSCSI Target.
419      * 
420      * @return The used block size in bytes.
421      */
422     public final long getBlockSize () {
423 
424         return capacityInformations.getBlockSize();
425     }
426 
427     /**
428      * Returns the capacity (in blocks) of the connected iSCSI Target.
429      * 
430      * @return The capacity in blocks.
431      */
432     public final long getCapacity () {
433 
434         return capacityInformations.getSize();
435     }
436 
437     // --------------------------------------------------------------------------
438     // --------------------------------------------------------------------------
439 
440     /**
441      * This method invokes the same called method of the current <code>IPhase</code> instance.
442      * 
443      * 
444      * @throws Exception if any error occurs.
445      */
446     public final void login () throws Exception {
447 
448         executeTask(new LoginTask(this));
449     }
450 
451     /**
452      * This method invokes the same called method of the current <code>IPhase</code> instance.
453      * 
454      * 
455      * @throws TaskExecutionException if any error occurs.
456      */
457     public final void logout () throws TaskExecutionException {
458 
459         executeTask(new LogoutTask(this));
460     }
461 
462     // --------------------------------------------------------------------------
463     // --------------------------------------------------------------------------
464 
465     /**
466      * This method invokes the same called method of the current <code>IPhase</code> instance.
467      * 
468      * 
469      * @param dst Store the read bytes to this buffer.
470      * @param logicalBlockAddress The logical block address of the device to begin the read operation.
471      * @param transferLength The number of bytes to read from the device.
472      * @throws Exception if any error occurs.
473      * @return The Future object.
474      * @throws TaskExecutionException if execution fails
475      */
476     public final Future<Void> read (final ByteBuffer dst, final int logicalBlockAddress, final long transferLength) throws TaskExecutionException {
477 
478         return executeTask(new ReadTask(this, dst, logicalBlockAddress, transferLength));
479     }
480 
481     /**
482      * This method invokes the same called method of the current <code>IPhase</code> instance.
483      * 
484      * 
485      * @param src Write the remaining bytes to the device.
486      * @param logicalBlockAddress The logical block address of the device to begin the write operation.
487      * @param transferLength The number of bytes to write to the device.
488      * @throws Exception if any error occurs.
489      * @return The Future object.
490      * @throws TaskExecutionException if execution fails
491      */
492     public final Future<Void> write (final ByteBuffer src, final int logicalBlockAddress, final long transferLength) throws TaskExecutionException {
493 
494         return executeTask(new WriteTask(this, src, logicalBlockAddress, transferLength));
495     }
496 
497     // --------------------------------------------------------------------------
498     // --------------------------------------------------------------------------
499 
500     // --------------------------------------------------------------------------
501     // --------------------------------------------------------------------------
502 
503     /**
504      * Returns the current <code>LoginStage</code> object.
505      * 
506      * @return The instance to the current <code>LoginStage</code>.
507      */
508     public final LoginStage getPhase () {
509 
510         return phase.getStage();
511     }
512 
513     /**
514      * This method sets the current <code>IPhase</code> instance to the given value.
515      * 
516      * @param newPhase The new instance to switch to.
517      */
518     public final void setPhase (final IPhase newPhase) {
519 
520         phase = newPhase;
521         LOGGER.trace("Switching to phase " + newPhase.getClass().getSimpleName());
522 
523     }
524 
525     /**
526      * This methods appends the given task to the end of the taskQueue and set the calling thread is sleep state.
527      * 
528      * @param task The task to append to the end of the taskQueue.
529      * @throws InterruptedException if another thread interrupted the current thread before or while the current thread
530      *             was waiting for a notification. The interrupted status of the current thread is cleared when this
531      *             exception is thrown.
532      * @throws ExecutionException if anything happens while execution
533      */
534     private final Future<Void> executeTask (final ITask task) throws TaskExecutionException {
535 
536         if (task instanceof IOTask) {
537             final Future<Void> returnVal = executor.submit((IOTask) task);
538             return returnVal;
539         } else {
540             try {
541                 task.call();
542             } catch (final Exception exc) {
543                 throw new TaskExecutionException(new ExecutionException(exc));
544             }
545             return null;
546         }
547         // LOGGER.info("Added a " + task + " to the TaskQueue");
548     }
549 
550     /**
551      * removes Task from outstandingTasks.
552      * 
553      * @param ftask The Task which was finished .
554      */
555     public final void finishedTask (final ITask ftask) {
556 
557         try {
558             taskBalancer.releaseConnection(outstandingTasks.get(ftask));
559         } catch (NoSuchConnectionException e) {
560             e.printStackTrace();
561         }
562         outstandingTasks.remove(ftask);
563         LOGGER.debug("Finished a " + ftask + " for the session " + targetName);
564     }
565 
566     /**
567      * restarts a Task from outstandingTasks.
568      * 
569      * @param task The failed Task.
570      * @throws ExecutionException for failed restart of the task
571      */
572     public final void restartTask (final ITask task) throws ExecutionException {
573 
574         try {
575             if (task != null) {
576                 if (task instanceof IOTask) {
577                     executor.submit((IOTask) task);
578                 } else {
579                     task.call();
580                 }
581                 taskBalancer.releaseConnection(outstandingTasks.get(task));
582                 outstandingTasks.remove(task);
583             }
584             LOGGER.debug("Restarted a Task out of the outstandingTasks Queue");
585         } catch (Exception e) {
586             throw new ExecutionException(e);
587         }
588     }
589 
590     /**
591      * Adds a Task to the outstandingTasks Hashmap.
592      * 
593      * @param connection The Connection where the Task will be started
594      * @param task The Task which was started.
595      */
596     public final void addOutstandingTask (final Connection connection, final ITask task) {
597 
598         outstandingTasks.put(task, connection);
599         LOGGER.debug("Added a Task to the outstandingTasks Queue");
600     }
601 
602     /**
603      * Adds a Task to the outstandingTasks Hashmap.
604      * 
605      * @param connection The Connection which will be released
606      * @throws NoSuchConnectionException if any errors occur
607      */
608     public final void releaseUsedConnection (final Connection connection) throws NoSuchConnectionException {
609 
610         taskBalancer.releaseConnection(connection);
611     }
612 
613     // --------------------------------------------------------------------------
614     // --------------------------------------------------------------------------
615     // --------------------------------------------------------------------------
616     // --------------------------------------------------------------------------
617 
618 }