1 /** 2 * Copyright (c) 2012, University of Konstanz, Distributed Systems Group All rights reserved. 3 * 4 * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the 5 * following conditions are met: * Redistributions of source code must retain the above copyright notice, this list of 6 * conditions and the following disclaimer. * Redistributions in binary form must reproduce the above copyright notice, 7 * this list of conditions and the following disclaimer in the documentation and/or other materials provided with the 8 * distribution. * Neither the name of the University of Konstanz nor the names of its contributors may be used to 9 * endorse or promote products derived from this software without specific prior written permission. 10 * 11 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, 12 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 13 * DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, 14 * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 15 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, 16 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 17 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 18 */ 19 /** 20 * 21 */ 22 23 package org.jscsi.initiator.connection; 24 25 26 import java.io.IOException; 27 import java.net.InetSocketAddress; 28 import java.nio.ByteBuffer; 29 import java.util.concurrent.ConcurrentHashMap; 30 import java.util.concurrent.ExecutionException; 31 import java.util.concurrent.ExecutorService; 32 import java.util.concurrent.Future; 33 import java.util.concurrent.LinkedBlockingQueue; 34 35 import org.jscsi.exception.NoSuchConnectionException; 36 import org.jscsi.exception.TaskExecutionException; 37 import org.jscsi.initiator.Configuration; 38 import org.jscsi.initiator.LinkFactory; 39 import org.jscsi.initiator.connection.phase.IPhase; 40 import org.jscsi.initiator.connection.phase.SecurityNegotiationPhase; 41 import org.jscsi.initiator.connection.state.LoginRequestState; 42 import org.jscsi.initiator.taskbalancer.AbstractTaskBalancer; 43 import org.jscsi.initiator.taskbalancer.SimpleTaskBalancer; 44 import org.jscsi.parser.datasegment.OperationalTextKey; 45 import org.jscsi.parser.datasegment.SettingsMap; 46 import org.jscsi.parser.login.LoginStage; 47 import org.jscsi.utils.SerialArithmeticNumber; 48 import org.slf4j.Logger; 49 import org.slf4j.LoggerFactory; 50 51 52 /** 53 * <h1>Session</h1> 54 * <p/> 55 * A session or Initiator Target Nexus is a directed communication from an iSCSI Initiator to an iSCSI Target. Each 56 * session can contain several connections. This allows a better usage of bandwidth and decreases latency times. The 57 * Abstract Class is used to implement serveral single- and multithreaded variants of Sessions 58 * 59 * @author Volker Wildi, University of Konstanz 60 * @author Patrice Matthias Brend'amour, University of Kontanz 61 * @author Sebastian Graf, University of Kontanz 62 */ 63 public final class Session { 64 65 /** The unique name of the connected iSCSI Target. */ 66 protected final String targetName; 67 68 /** The unique name of the connected iSCSI Target. */ 69 protected final InetSocketAddress inetSocketAddress; 70 71 /** The maximum number of connections, which are allowed in this session. */ 72 private int maxConnections; 73 74 /** The index of the next used connection ID. */ 75 protected short nextFreeConnectionID; 76 77 /** The session is in this phase. */ 78 protected IPhase phase; 79 80 /** 81 * This instance contains the informations about the capacity of the connected target. 82 */ 83 protected final TargetCapacityInformations capacityInformations; 84 85 /** A List object with all open connections. */ 86 protected final LinkedBlockingQueue<Connection> connections; 87 88 /** The Command Sequence Number of this session. */ 89 protected final SerialArithmeticNumber commandSequenceNumber; 90 91 /** The Maximum Command Sequence Number of this session. */ 92 protected final SerialArithmeticNumber maximumCommandSequenceNumber; 93 94 /** 95 * The initiator uses this Initiator Task Tag to relate data to the appropriate command. And the target uses this 96 * tag to correlate the data to the appropriate command that it received earlier. 97 */ 98 protected final SerialArithmeticNumber initiatorTaskTag; 99 100 /** 101 * Flag to indicate, if the login phase of this session is successfully completed. This flag is also used for the 102 * protection of a reseting of <code>targetSessionIdentifyingHandle</code>. 103 */ 104 protected boolean tsihChanged; 105 106 /** The Target Session Identifying Handle. */ 107 protected short targetSessionIdentifyingHandle; 108 109 /** The Logger interface. */ 110 private static final Logger LOGGER = LoggerFactory.getLogger(Session.class); 111 112 /** The <code>Configuration</code> instance for this session. */ 113 protected final Configuration configuration; 114 115 /** The <code>LinkFactory</code> instance for this session. */ 116 protected final LinkFactory factory; 117 118 /** Executor to work with all task to be commited. */ 119 private final ExecutorService executor; 120 121 /** Contains all queues, which are till now not successfully finished. */ 122 // FIXME: Support me! 123 private final ConcurrentHashMap<ITask , Connection> outstandingTasks; 124 125 /** 126 * Handles the load balancing of the task distribution to the opened connections. 127 */ 128 protected final AbstractTaskBalancer taskBalancer; 129 130 // -------------------------------------------------------------------------- 131 // -------------------------------------------------------------------------- 132 133 /** 134 * Constructor to create a new, empty <code>AbsSession</code> object with a maximum number of allowed connections to 135 * a given iSCSI Target. This is the abstract definition for Session implementations 136 * 137 * @param linkFactory The LinkFactory which called the Constructor 138 * @param initConfiguration The configuration to use within this session. 139 * @param initTargetName The name of the iSCSI Target. 140 * @param inetAddress The <code>InetSocketAddress</code> of the leading connection of this session. 141 * @param initExecutor The <code>ExecutorService</code> for the Connection Threads 142 * @throws Exception if anything happens 143 */ 144 145 public Session (final LinkFactory linkFactory, final Configuration initConfiguration, final String initTargetName, final InetSocketAddress inetAddress, final ExecutorService initExecutor) throws Exception { 146 147 maxConnections = Integer.parseInt(initConfiguration.getSessionSetting(initTargetName, OperationalTextKey.MAX_CONNECTIONS)); 148 factory = linkFactory; 149 configuration = initConfiguration; 150 commandSequenceNumber = new SerialArithmeticNumber(); 151 maximumCommandSequenceNumber = new SerialArithmeticNumber(1); 152 nextFreeConnectionID = 1; 153 inetSocketAddress = inetAddress; 154 initiatorTaskTag = new SerialArithmeticNumber(1); 155 targetName = initTargetName; 156 phase = new SecurityNegotiationPhase(); 157 capacityInformations = new TargetCapacityInformations(); 158 connections = new LinkedBlockingQueue<Connection>(maxConnections); 159 executor = initExecutor; 160 taskBalancer = new SimpleTaskBalancer(connections); 161 outstandingTasks = new ConcurrentHashMap<ITask , Connection>(); 162 163 // Add the leading connection 164 addNewConnection(); 165 166 /* 167 * We have to check whether the MaxConnection setting in our Configuration is correct. There might be a wrong 168 * setting for a target e.g. the target only supports one connection but we think it can handle two. 169 */ 170 maxConnections = Integer.parseInt(configuration.getSessionSetting(targetName, OperationalTextKey.MAX_CONNECTIONS)); 171 int targetMaxC = connections.peek().getSettingAsInt(OperationalTextKey.MAX_CONNECTIONS); 172 if (targetMaxC < maxConnections) { 173 maxConnections = targetMaxC; 174 } 175 176 // Add more Connections 177 // TODO Do something more intelligent here. Always adding the maximum 178 // isn't 179 // always a good idea 180 addConnections(maxConnections - 1); 181 182 } 183 184 /** 185 * Returns the Target Session Identifying Handle (TSID) of this <code>Session</code> object. 186 * 187 * @return The current Target Session Identifying Handle (TSIH) 188 */ 189 public final short getTargetSessionIdentifyingHandle () { 190 191 return targetSessionIdentifyingHandle; 192 } 193 194 /** 195 * Sets the Target Session Identifying Handle (TSIH) to the given value. This TSIH is specified at the Login Phase 196 * by the target in a new session. So, it can only set one time. 197 * 198 * @param tsih The new Target Session Identifying Handle. 199 */ 200 public final void setTargetSessionIdentifyingHandle (final short tsih) { 201 202 if (!tsihChanged) { 203 targetSessionIdentifyingHandle = tsih; 204 tsihChanged = true; 205 } 206 } 207 208 // -------------------------------------------------------------------------- 209 // -------------------------------------------------------------------------- 210 211 /** 212 * Returns the Command Sequence Number of this session. 213 * 214 * @return The current Command Sequence Number. 215 */ 216 public final int getCommandSequenceNumber () { 217 218 return commandSequenceNumber.getValue(); 219 } 220 221 /** 222 * Sets the Maximum Command Sequence Number to a new value. 223 * 224 * @param newMaximumCommandSequenceNumber The new Maximum Command Sequence Number. 225 */ 226 public final void setMaximumCommandSequenceNumber (final int newMaximumCommandSequenceNumber) { 227 228 maximumCommandSequenceNumber.setValue(newMaximumCommandSequenceNumber); 229 } 230 231 /** 232 * Returns the Maximum Command Sequence Number of this session. 233 * 234 * @return The current Maximum Command Sequence Number. 235 */ 236 public final SerialArithmeticNumber getMaximumCommandSequenceNumber () { 237 238 return maximumCommandSequenceNumber; 239 } 240 241 /** 242 * Returns the Initiator Task Tag of this session. 243 * 244 * @return The Initiator Task Tag. 245 */ 246 public final int getInitiatorTaskTag () { 247 248 return initiatorTaskTag.getValue(); 249 } 250 251 /** 252 * Increments the Initiator Task Tag as defined in RFC1982 where <code>SERIAL_BITS = 32</code>. 253 */ 254 public final void incrementInitiatorTaskTag () { 255 256 initiatorTaskTag.increment(); 257 } 258 259 /** 260 * Has the iSCSI Target enough resources to accept more incoming PDU? 261 * 262 * @return <code>true</code>, if the iSCSI Target has enough resources to accept more incoming PDUs. Else 263 * <code>false</code> and hold out for sending. 264 */ 265 public final boolean hasTargetMoreResources () { 266 267 return maximumCommandSequenceNumber.compareTo(commandSequenceNumber.getValue()) > 0; 268 } 269 270 // -------------------------------------------------------------------------- 271 // -------------------------------------------------------------------------- 272 273 /** 274 * Returns the name of the iSCSI Target of this session. 275 * 276 * @return The name of the iSCSI Target. 277 */ 278 public final String getTargetName () { 279 280 return targetName; 281 } 282 283 // -------------------------------------------------------------------------- 284 // -------------------------------------------------------------------------- 285 286 /** 287 * Adds a number of new connections to this session. 288 * 289 * @param max The number of Connections to open. 290 * @throws Exception if any error occurs. 291 */ 292 public final void addConnections (final int max) throws Exception { 293 294 if (connections.size() < maxConnections) { 295 for (int i = 1; i < max; i++) { 296 addNewConnection(); 297 } 298 } 299 } 300 301 /** 302 * Adds a new connection to this session with the next free connection ID (if the maximum number is not reached). 303 * 304 * @return The connection ID of the newly created connection. 305 * @throws Exception if any error occurs. 306 */ 307 protected final short addNewConnection () throws Exception { 308 309 if (connections.size() < maxConnections) { 310 311 final Connection connection = factory.getConnection(this, configuration, inetSocketAddress, nextFreeConnectionID); 312 connection.nextState(new LoginRequestState(connection, LoginStage.FULL_FEATURE_PHASE)); 313 // login phase successful, so we can add a new connection 314 connections.add(connection); 315 316 // only needed on the leading login connection 317 if (connections.size() == 1) { 318 phase.getCapacity(this, capacityInformations); 319 320 if (connection.getSettingAsInt(OperationalTextKey.MAX_CONNECTIONS) > 1) { 321 phase.login(this); 322 } 323 } 324 325 return nextFreeConnectionID++; 326 } else { 327 LOGGER.warn("Unused new connection -> ignored!"); 328 return nextFreeConnectionID; 329 } 330 331 } 332 333 /** 334 * Updates the MaxConnection setting, so that it grows/shrinks the Connectionlist. 335 * 336 * @param max The maximum number of concurrent <code>Connections</code> to a target. 337 */ 338 public void updateMaxConnections (final int max) { 339 340 try { 341 Connection conn = taskBalancer.getConnection(); 342 int update = 0; 343 int targetMaxC = connections.peek().getSettingAsInt(OperationalTextKey.MAX_CONNECTIONS); 344 if (targetMaxC <= max) { 345 if (targetMaxC > maxConnections) { 346 update = targetMaxC - maxConnections; 347 maxConnections = targetMaxC; 348 } 349 } else { 350 if (max >= maxConnections) { 351 update = max - maxConnections; 352 maxConnections = max; 353 } 354 } 355 356 SettingsMap sm = new SettingsMap(); 357 sm.add(OperationalTextKey.MAX_CONNECTIONS, String.valueOf(maxConnections)); 358 conn.update(sm); 359 taskBalancer.releaseConnection(conn); 360 361 if (update > 0) { 362 addConnections(update); 363 } else { 364 for (int i = -1; i >= update; i--) { 365 taskBalancer.getConnection().close(); 366 } 367 } 368 369 } catch (Exception e) { 370 // DO Nothing 371 } 372 373 } 374 375 /** 376 * Returns the next free <code>Connection</code> object of this <code>Session</code> object. 377 * 378 * @return The connection to use for the next task. 379 * @throws NoSuchConnectionException If there is no such connection. 380 */ 381 public final Connection getNextFreeConnection () throws NoSuchConnectionException { 382 383 return taskBalancer.getConnection(); 384 } 385 386 /** 387 * Increments the Command Sequence Number as defined in RFC1982, where <code>SERIAL_BITS = 32</code>. 388 */ 389 public final void incrementCommandSequenceNumber () { 390 391 commandSequenceNumber.increment(); 392 } 393 394 // -------------------------------------------------------------------------- 395 // -------------------------------------------------------------------------- 396 397 /** 398 * Closes this session instances with all opened connections. 399 * 400 * @throws IOException if an I/O error occurs. 401 */ 402 public final void close () throws IOException { 403 404 LOGGER.info("Closing was requested."); 405 406 for (Connection c : connections) { 407 c.close(); 408 } 409 410 connections.clear(); 411 412 // stop session task thread 413 factory.closedSession(this); 414 executor.shutdown(); 415 } 416 417 /** 418 * Returns the used block size of the connected iSCSI Target. 419 * 420 * @return The used block size in bytes. 421 */ 422 public final long getBlockSize () { 423 424 return capacityInformations.getBlockSize(); 425 } 426 427 /** 428 * Returns the capacity (in blocks) of the connected iSCSI Target. 429 * 430 * @return The capacity in blocks. 431 */ 432 public final long getCapacity () { 433 434 return capacityInformations.getSize(); 435 } 436 437 // -------------------------------------------------------------------------- 438 // -------------------------------------------------------------------------- 439 440 /** 441 * This method invokes the same called method of the current <code>IPhase</code> instance. 442 * 443 * 444 * @throws Exception if any error occurs. 445 */ 446 public final void login () throws Exception { 447 448 executeTask(new LoginTask(this)); 449 } 450 451 /** 452 * This method invokes the same called method of the current <code>IPhase</code> instance. 453 * 454 * 455 * @throws TaskExecutionException if any error occurs. 456 */ 457 public final void logout () throws TaskExecutionException { 458 459 executeTask(new LogoutTask(this)); 460 } 461 462 // -------------------------------------------------------------------------- 463 // -------------------------------------------------------------------------- 464 465 /** 466 * This method invokes the same called method of the current <code>IPhase</code> instance. 467 * 468 * 469 * @param dst Store the read bytes to this buffer. 470 * @param logicalBlockAddress The logical block address of the device to begin the read operation. 471 * @param transferLength The number of bytes to read from the device. 472 * @throws Exception if any error occurs. 473 * @return The Future object. 474 * @throws TaskExecutionException if execution fails 475 */ 476 public final Future<Void> read (final ByteBuffer dst, final int logicalBlockAddress, final long transferLength) throws TaskExecutionException { 477 478 return executeTask(new ReadTask(this, dst, logicalBlockAddress, transferLength)); 479 } 480 481 /** 482 * This method invokes the same called method of the current <code>IPhase</code> instance. 483 * 484 * 485 * @param src Write the remaining bytes to the device. 486 * @param logicalBlockAddress The logical block address of the device to begin the write operation. 487 * @param transferLength The number of bytes to write to the device. 488 * @throws Exception if any error occurs. 489 * @return The Future object. 490 * @throws TaskExecutionException if execution fails 491 */ 492 public final Future<Void> write (final ByteBuffer src, final int logicalBlockAddress, final long transferLength) throws TaskExecutionException { 493 494 return executeTask(new WriteTask(this, src, logicalBlockAddress, transferLength)); 495 } 496 497 // -------------------------------------------------------------------------- 498 // -------------------------------------------------------------------------- 499 500 // -------------------------------------------------------------------------- 501 // -------------------------------------------------------------------------- 502 503 /** 504 * Returns the current <code>LoginStage</code> object. 505 * 506 * @return The instance to the current <code>LoginStage</code>. 507 */ 508 public final LoginStage getPhase () { 509 510 return phase.getStage(); 511 } 512 513 /** 514 * This method sets the current <code>IPhase</code> instance to the given value. 515 * 516 * @param newPhase The new instance to switch to. 517 */ 518 public final void setPhase (final IPhase newPhase) { 519 520 phase = newPhase; 521 LOGGER.trace("Switching to phase " + newPhase.getClass().getSimpleName()); 522 523 } 524 525 /** 526 * This methods appends the given task to the end of the taskQueue and set the calling thread is sleep state. 527 * 528 * @param task The task to append to the end of the taskQueue. 529 * @throws InterruptedException if another thread interrupted the current thread before or while the current thread 530 * was waiting for a notification. The interrupted status of the current thread is cleared when this 531 * exception is thrown. 532 * @throws ExecutionException if anything happens while execution 533 */ 534 private final Future<Void> executeTask (final ITask task) throws TaskExecutionException { 535 536 if (task instanceof IOTask) { 537 final Future<Void> returnVal = executor.submit((IOTask) task); 538 return returnVal; 539 } else { 540 try { 541 task.call(); 542 } catch (final Exception exc) { 543 throw new TaskExecutionException(new ExecutionException(exc)); 544 } 545 return null; 546 } 547 // LOGGER.info("Added a " + task + " to the TaskQueue"); 548 } 549 550 /** 551 * removes Task from outstandingTasks. 552 * 553 * @param ftask The Task which was finished . 554 */ 555 public final void finishedTask (final ITask ftask) { 556 557 try { 558 taskBalancer.releaseConnection(outstandingTasks.get(ftask)); 559 } catch (NoSuchConnectionException e) { 560 e.printStackTrace(); 561 } 562 outstandingTasks.remove(ftask); 563 LOGGER.debug("Finished a " + ftask + " for the session " + targetName); 564 } 565 566 /** 567 * restarts a Task from outstandingTasks. 568 * 569 * @param task The failed Task. 570 * @throws ExecutionException for failed restart of the task 571 */ 572 public final void restartTask (final ITask task) throws ExecutionException { 573 574 try { 575 if (task != null) { 576 if (task instanceof IOTask) { 577 executor.submit((IOTask) task); 578 } else { 579 task.call(); 580 } 581 taskBalancer.releaseConnection(outstandingTasks.get(task)); 582 outstandingTasks.remove(task); 583 } 584 LOGGER.debug("Restarted a Task out of the outstandingTasks Queue"); 585 } catch (Exception e) { 586 throw new ExecutionException(e); 587 } 588 } 589 590 /** 591 * Adds a Task to the outstandingTasks Hashmap. 592 * 593 * @param connection The Connection where the Task will be started 594 * @param task The Task which was started. 595 */ 596 public final void addOutstandingTask (final Connection connection, final ITask task) { 597 598 outstandingTasks.put(task, connection); 599 LOGGER.debug("Added a Task to the outstandingTasks Queue"); 600 } 601 602 /** 603 * Adds a Task to the outstandingTasks Hashmap. 604 * 605 * @param connection The Connection which will be released 606 * @throws NoSuchConnectionException if any errors occur 607 */ 608 public final void releaseUsedConnection (final Connection connection) throws NoSuchConnectionException { 609 610 taskBalancer.releaseConnection(connection); 611 } 612 613 // -------------------------------------------------------------------------- 614 // -------------------------------------------------------------------------- 615 // -------------------------------------------------------------------------- 616 // -------------------------------------------------------------------------- 617 618 }