1 /** 2 * Copyright (c) 2012, University of Konstanz, Distributed Systems Group All rights reserved. 3 * 4 * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the 5 * following conditions are met: * Redistributions of source code must retain the above copyright notice, this list of 6 * conditions and the following disclaimer. * Redistributions in binary form must reproduce the above copyright notice, 7 * this list of conditions and the following disclaimer in the documentation and/or other materials provided with the 8 * distribution. * Neither the name of the University of Konstanz nor the names of its contributors may be used to 9 * endorse or promote products derived from this software without specific prior written permission. 10 * 11 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, 12 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 13 * DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, 14 * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 15 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, 16 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 17 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 18 */ 19 package org.jscsi.initiator.connection; 20 21 22 import java.io.IOException; 23 import java.net.InetSocketAddress; 24 import java.nio.channels.ClosedChannelException; 25 import java.nio.channels.SocketChannel; 26 import java.security.DigestException; 27 28 import org.jscsi.exception.InternetSCSIException; 29 import org.jscsi.parser.InitiatorMessageParser; 30 import org.jscsi.parser.ProtocolDataUnit; 31 import org.jscsi.parser.ProtocolDataUnitFactory; 32 import org.jscsi.parser.TargetMessageParser; 33 import org.jscsi.parser.datasegment.OperationalTextKey; 34 import org.slf4j.Logger; 35 import org.slf4j.LoggerFactory; 36 37 38 /** 39 * <h1>SenderWorker</h1> 40 * <p/> 41 * The worker caller to send all the protocol data units over the socket of this connection. 42 * 43 * @author Volker Wildi 44 */ 45 public final class SenderWorker { 46 47 // -------------------------------------------------------------------------- 48 // -------------------------------------------------------------------------- 49 50 /** The logger interface. */ 51 private static final Logger LOGGER = LoggerFactory.getLogger(SenderWorker.class); 52 53 // -------------------------------------------------------------------------- 54 // -------------------------------------------------------------------------- 55 56 /** The <code>Connection</code> instance of this worker caller. */ 57 private final Connection connection; 58 59 /** 60 * Non-blocking socket connection to use for the data transfer. 61 */ 62 private final SocketChannel socketChannel; 63 64 /** 65 * Factory class for creating the several <code>ProtocolDataUnit</code> instances. 66 */ 67 private final ProtocolDataUnitFactory protocolDataUnitFactory; 68 69 // -------------------------------------------------------------------------- 70 // -------------------------------------------------------------------------- 71 72 /** 73 * Creates a new, empty <code>SenderWorker</code> instance. 74 * 75 * @param initConnection The reference connection of this worker caller. 76 * @param inetAddress The InetSocketAddress of the Target. 77 * @throws IOException if any IO error occurs. 78 */ 79 public SenderWorker (final Connection initConnection, final InetSocketAddress inetAddress) throws IOException { 80 81 connection = initConnection; 82 socketChannel = SocketChannel.open(inetAddress); 83 socketChannel.socket().setTcpNoDelay(true); 84 85 protocolDataUnitFactory = new ProtocolDataUnitFactory(); 86 87 } 88 89 // -------------------------------------------------------------------------- 90 // -------------------------------------------------------------------------- 91 92 /** 93 * This method does all the necessary steps, which are needed when a connection should be closed. 94 * 95 * @throws IOException if an I/O error occurs. 96 */ 97 public final void close () throws IOException { 98 99 socketChannel.close(); 100 101 } 102 103 // -------------------------------------------------------------------------- 104 // -------------------------------------------------------------------------- 105 106 /** 107 * Receives a <code>ProtocolDataUnit</code> from the socket and appends it to the end of the receiving queue of this 108 * connection. 109 * 110 * @return Queue with the resulting units 111 * @throws IOException if an I/O error occurs. 112 * @throws InternetSCSIException if any violation of the iSCSI-Standard emerge. 113 * @throws DigestException if a mismatch of the digest exists. 114 */ 115 public ProtocolDataUnit receiveFromWire () throws DigestException , InternetSCSIException , IOException { 116 117 final ProtocolDataUnit protocolDataUnit = protocolDataUnitFactory.create(connection.getSetting(OperationalTextKey.HEADER_DIGEST), connection.getSetting(OperationalTextKey.DATA_DIGEST)); 118 119 try { 120 protocolDataUnit.read(socketChannel); 121 } catch (ClosedChannelException e) { 122 throw new InternetSCSIException(e); 123 } 124 125 LOGGER.debug("Receiving this PDU: " + protocolDataUnit); 126 127 final Exception isCorrect = connection.getState().isCorrect(protocolDataUnit); 128 if (isCorrect == null) { 129 LOGGER.trace("Adding PDU to Receiving Queue."); 130 131 final TargetMessageParser parser = (TargetMessageParser) protocolDataUnit.getBasicHeaderSegment().getParser(); 132 final Session session = connection.getSession(); 133 134 // the PDU maxCmdSN is greater than the local maxCmdSN, so we 135 // have to update the local one 136 if (session.getMaximumCommandSequenceNumber().compareTo(parser.getMaximumCommandSequenceNumber()) < 0) { 137 session.setMaximumCommandSequenceNumber(parser.getMaximumCommandSequenceNumber()); 138 } 139 140 // the PDU expCmdSN is greater than the local expCmdSN, so we 141 // have to update the local one 142 if (parser.incrementSequenceNumber()) { 143 if (connection.getExpectedStatusSequenceNumber().compareTo(parser.getStatusSequenceNumber()) >= 0) { 144 connection.incrementExpectedStatusSequenceNumber(); 145 } else { 146 LOGGER.error("Status Sequence Number Mismatch (received, expected): " + parser.getStatusSequenceNumber() + ", " + (connection.getExpectedStatusSequenceNumber().getValue() - 1)); 147 } 148 149 } 150 151 } else { 152 throw new InternetSCSIException(isCorrect); 153 } 154 return protocolDataUnit; 155 } 156 157 // -------------------------------------------------------------------------- 158 // -------------------------------------------------------------------------- 159 160 /** 161 * Sends the given <code>ProtocolDataUnit</code> instance over the socket to the connected iSCSI Target. 162 * 163 * @param unit The <code>ProtocolDataUnit</code> instances to send. 164 * @throws InternetSCSIException if any violation of the iSCSI-Standard emerge. 165 * @throws IOException if an I/O error occurs. 166 * @throws InterruptedException if another caller interrupted the current caller before or while the current caller 167 * was waiting for a notification. The interrupted status of the current caller is cleared when this 168 * exception is thrown. 169 */ 170 public final void sendOverWire (final ProtocolDataUnit unit) throws InternetSCSIException , IOException , InterruptedException { 171 172 final Session session = connection.getSession(); 173 174 unit.getBasicHeaderSegment().setInitiatorTaskTag(session.getInitiatorTaskTag()); 175 final InitiatorMessageParser parser = (InitiatorMessageParser) unit.getBasicHeaderSegment().getParser(); 176 parser.setCommandSequenceNumber(session.getCommandSequenceNumber()); 177 parser.setExpectedStatusSequenceNumber(connection.getExpectedStatusSequenceNumber().getValue()); 178 179 unit.write(socketChannel); 180 181 LOGGER.debug("Sending this PDU: " + unit); 182 183 // increment the Command Sequence Number 184 if (parser.incrementSequenceNumber()) { 185 connection.getSession().incrementCommandSequenceNumber(); 186 } 187 188 } 189 190 // -------------------------------------------------------------------------- 191 // -------------------------------------------------------------------------- 192 // -------------------------------------------------------------------------- 193 // -------------------------------------------------------------------------- 194 195 }