1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 package net.sf.hermesftp.streams;
26
27 import java.io.ByteArrayOutputStream;
28 import java.io.IOException;
29 import java.io.InputStream;
30 import java.util.HashMap;
31 import java.util.Map;
32
33 import org.apache.commons.lang.ArrayUtils;
34
35 /***
36 * Reads a stream formatted in block mode, extracts the data contents and registers restart markers.
37 *
38 * @author Lars Behnke
39 */
40 public class BlockModeInputStream extends InputStream implements RecordReadSupport, BlockModeConstants {
41
42 private static final String UNEXPECTED_END_OF_STREAM = "Unexpected end of stream.";
43
44 private byte[] buffer;
45
46 private int idx;
47
48 private long byteCount;
49
50 private InputStream is;
51
52 private Map<Long, Long> restartMarkers = new HashMap<Long, Long>();
53
54 private byte[] eorMarkerBytes;
55
56 private boolean eof;
57
58 private boolean eor;
59
60 /***
61 * Constructor.
62 *
63 * @param is the input stream.
64 * @param eorMarker The marker bytes for EOR.
65 * @param restartMarkers Hash table to be filled with restart markers.
66 */
67 public BlockModeInputStream(InputStream is, byte[] eorMarker, Map<Long, Long> restartMarkers) {
68 super();
69 this.is = is;
70 if (eorMarker == null) {
71 String lineSep = System.getProperty("line.separator");
72 eorMarker = lineSep.getBytes();
73 }
74 this.eorMarkerBytes = eorMarker;
75 if (restartMarkers == null) {
76 restartMarkers = new HashMap<Long, Long>();
77 }
78 this.restartMarkers = restartMarkers;
79 }
80
81 /***
82 * Constructor.
83 *
84 * @param is The input stream.
85 */
86 public BlockModeInputStream(InputStream is) {
87 this(is, null, null);
88 }
89
90 /***
91 * {@inheritDoc}
92 */
93 public byte[] readRecord() throws IOException {
94 ByteArrayOutputStream baos = new ByteArrayOutputStream();
95 try {
96 while (!(eor && idx == 0)) {
97 int b = read();
98 if (b == -1) {
99 throw new IOException(UNEXPECTED_END_OF_STREAM);
100 }
101 baos.write(b);
102 }
103 } finally {
104 baos.close();
105 }
106 return baos.toByteArray();
107
108 }
109
110 /***
111 * {@inheritDoc}
112 */
113 public int read() throws IOException {
114 if (buffer == null) {
115 if (eof) {
116 return -1;
117 }
118 int descriptor = is.read();
119 int hi = is.read();
120 int lo = is.read();
121 checkHeader(descriptor, hi, lo);
122 int len = hi << 8 | lo;
123 buffer = new byte[len];
124 int realLength = is.read(buffer);
125 if (realLength != len) {
126 throw new IOException(UNEXPECTED_END_OF_STREAM);
127 }
128 eof = (descriptor & BlockModeConstants.DESC_CODE_EOF) > 0;
129 eor = (descriptor & BlockModeConstants.DESC_CODE_EOR) > 0;
130 boolean mark = (descriptor & BlockModeConstants.DESC_CODE_REST) > 0;
131 if (mark) {
132 setRestartMarker();
133 buffer = null;
134 return read();
135 }
136 byteCount += len;
137 if (eor) {
138 buffer = ArrayUtils.addAll(buffer, eorMarkerBytes);
139 }
140 }
141 int result = buffer[idx];
142 idx++;
143 if (idx >= buffer.length) {
144 buffer = null;
145 idx = 0;
146 }
147 return result;
148 }
149
150 private void checkHeader(int descriptor, int hi, int lo) throws IOException {
151 if (descriptor == -1 || hi == -1 || lo == -1) {
152 throw new IOException(UNEXPECTED_END_OF_STREAM);
153 }
154 if ((descriptor & BlockModeConstants.DESC_CODE_ERR) > 0) {
155 throw new IOException("Error flag in descriptor code set.");
156 }
157 }
158
159 private void setRestartMarker() throws IOException {
160 if (buffer.length > 8) {
161 throw new IOException("Marker size exceeds 8 bytes.");
162 }
163 long marker = 0;
164 int len = buffer.length;
165 for (int i = 0; i < len; i++) {
166 marker |= buffer[i] << (8 * (len - i - 1));
167 }
168 restartMarkers.put(new Long(marker), new Long(byteCount));
169 }
170
171 /***
172 * Getter method for the java bean <code>restartMarkers</code>.
173 *
174 * @return Returns the value of the java bean <code>restartMarkers</code>.
175 */
176 public Map<Long, Long> getRestartMarkers() {
177 return restartMarkers;
178 }
179
180 }