View Javadoc

1   /**
2    * Copyright (c) 2012, University of Konstanz, Distributed Systems Group All rights reserved.
3    * 
4    * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
5    * following conditions are met: * Redistributions of source code must retain the above copyright notice, this list of
6    * conditions and the following disclaimer. * Redistributions in binary form must reproduce the above copyright notice,
7    * this list of conditions and the following disclaimer in the documentation and/or other materials provided with the
8    * distribution. * Neither the name of the University of Konstanz nor the names of its contributors may be used to
9    * endorse or promote products derived from this software without specific prior written permission.
10   * 
11   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
12   * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
13   * DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
14   * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
15   * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
16   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
17   * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
18   */
19  package org.jscsi.initiator.connection;
20  
21  
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.nio.channels.ClosedChannelException;
25  import java.nio.channels.SocketChannel;
26  import java.security.DigestException;
27  
28  import org.jscsi.exception.InternetSCSIException;
29  import org.jscsi.parser.InitiatorMessageParser;
30  import org.jscsi.parser.ProtocolDataUnit;
31  import org.jscsi.parser.ProtocolDataUnitFactory;
32  import org.jscsi.parser.TargetMessageParser;
33  import org.jscsi.parser.datasegment.OperationalTextKey;
34  import org.slf4j.Logger;
35  import org.slf4j.LoggerFactory;
36  
37  
38  /**
39   * <h1>SenderWorker</h1>
40   * <p/>
41   * The worker caller to send all the protocol data units over the socket of this connection.
42   * 
43   * @author Volker Wildi
44   */
45  public final class SenderWorker {
46  
47      // --------------------------------------------------------------------------
48      // --------------------------------------------------------------------------
49  
50      /** The logger interface. */
51      private static final Logger LOGGER = LoggerFactory.getLogger(SenderWorker.class);
52  
53      // --------------------------------------------------------------------------
54      // --------------------------------------------------------------------------
55  
56      /** The <code>Connection</code> instance of this worker caller. */
57      private final Connection connection;
58  
59      /**
60       * Non-blocking socket connection to use for the data transfer.
61       */
62      private final SocketChannel socketChannel;
63  
64      /**
65       * Factory class for creating the several <code>ProtocolDataUnit</code> instances.
66       */
67      private final ProtocolDataUnitFactory protocolDataUnitFactory;
68  
69      // --------------------------------------------------------------------------
70      // --------------------------------------------------------------------------
71  
72      /**
73       * Creates a new, empty <code>SenderWorker</code> instance.
74       * 
75       * @param initConnection The reference connection of this worker caller.
76       * @param inetAddress The InetSocketAddress of the Target.
77       * @throws IOException if any IO error occurs.
78       */
79      public SenderWorker (final Connection initConnection, final InetSocketAddress inetAddress) throws IOException {
80  
81          connection = initConnection;
82          socketChannel = SocketChannel.open(inetAddress);
83          socketChannel.socket().setTcpNoDelay(true);
84  
85          protocolDataUnitFactory = new ProtocolDataUnitFactory();
86  
87      }
88  
89      // --------------------------------------------------------------------------
90      // --------------------------------------------------------------------------
91  
92      /**
93       * This method does all the necessary steps, which are needed when a connection should be closed.
94       * 
95       * @throws IOException if an I/O error occurs.
96       */
97      public final void close () throws IOException {
98  
99          socketChannel.close();
100 
101     }
102 
103     // --------------------------------------------------------------------------
104     // --------------------------------------------------------------------------
105 
106     /**
107      * Receives a <code>ProtocolDataUnit</code> from the socket and appends it to the end of the receiving queue of this
108      * connection.
109      * 
110      * @return Queue with the resulting units
111      * @throws IOException if an I/O error occurs.
112      * @throws InternetSCSIException if any violation of the iSCSI-Standard emerge.
113      * @throws DigestException if a mismatch of the digest exists.
114      */
115     public ProtocolDataUnit receiveFromWire () throws DigestException , InternetSCSIException , IOException {
116 
117         final ProtocolDataUnit protocolDataUnit = protocolDataUnitFactory.create(connection.getSetting(OperationalTextKey.HEADER_DIGEST), connection.getSetting(OperationalTextKey.DATA_DIGEST));
118 
119         try {
120             protocolDataUnit.read(socketChannel);
121         } catch (ClosedChannelException e) {
122             throw new InternetSCSIException(e);
123         }
124 
125         LOGGER.debug("Receiving this PDU: " + protocolDataUnit);
126 
127         final Exception isCorrect = connection.getState().isCorrect(protocolDataUnit);
128         if (isCorrect == null) {
129             LOGGER.trace("Adding PDU to Receiving Queue.");
130 
131             final TargetMessageParser parser = (TargetMessageParser) protocolDataUnit.getBasicHeaderSegment().getParser();
132             final Session session = connection.getSession();
133 
134             // the PDU maxCmdSN is greater than the local maxCmdSN, so we
135             // have to update the local one
136             if (session.getMaximumCommandSequenceNumber().compareTo(parser.getMaximumCommandSequenceNumber()) < 0) {
137                 session.setMaximumCommandSequenceNumber(parser.getMaximumCommandSequenceNumber());
138             }
139 
140             // the PDU expCmdSN is greater than the local expCmdSN, so we
141             // have to update the local one
142             if (parser.incrementSequenceNumber()) {
143                 if (connection.getExpectedStatusSequenceNumber().compareTo(parser.getStatusSequenceNumber()) >= 0) {
144                     connection.incrementExpectedStatusSequenceNumber();
145                 } else {
146                     LOGGER.error("Status Sequence Number Mismatch (received, expected): " + parser.getStatusSequenceNumber() + ", " + (connection.getExpectedStatusSequenceNumber().getValue() - 1));
147                 }
148 
149             }
150 
151         } else {
152             throw new InternetSCSIException(isCorrect);
153         }
154         return protocolDataUnit;
155     }
156 
157     // --------------------------------------------------------------------------
158     // --------------------------------------------------------------------------
159 
160     /**
161      * Sends the given <code>ProtocolDataUnit</code> instance over the socket to the connected iSCSI Target.
162      * 
163      * @param unit The <code>ProtocolDataUnit</code> instances to send.
164      * @throws InternetSCSIException if any violation of the iSCSI-Standard emerge.
165      * @throws IOException if an I/O error occurs.
166      * @throws InterruptedException if another caller interrupted the current caller before or while the current caller
167      *             was waiting for a notification. The interrupted status of the current caller is cleared when this
168      *             exception is thrown.
169      */
170     public final void sendOverWire (final ProtocolDataUnit unit) throws InternetSCSIException , IOException , InterruptedException {
171 
172         final Session session = connection.getSession();
173 
174         unit.getBasicHeaderSegment().setInitiatorTaskTag(session.getInitiatorTaskTag());
175         final InitiatorMessageParser parser = (InitiatorMessageParser) unit.getBasicHeaderSegment().getParser();
176         parser.setCommandSequenceNumber(session.getCommandSequenceNumber());
177         parser.setExpectedStatusSequenceNumber(connection.getExpectedStatusSequenceNumber().getValue());
178 
179         unit.write(socketChannel);
180 
181         LOGGER.debug("Sending this PDU: " + unit);
182 
183         // increment the Command Sequence Number
184         if (parser.incrementSequenceNumber()) {
185             connection.getSession().incrementCommandSequenceNumber();
186         }
187 
188     }
189 
190     // --------------------------------------------------------------------------
191     // --------------------------------------------------------------------------
192     // --------------------------------------------------------------------------
193     // --------------------------------------------------------------------------
194 
195 }