1 | /* |
---|
2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
3 | * or more contributor license agreements. See the NOTICE file |
---|
4 | * distributed with this work for additional information |
---|
5 | * regarding copyright ownership. The ASF licenses this file |
---|
6 | * to you under the Apache License, Version 2.0 (the |
---|
7 | * "License"); you may not use this file except in compliance |
---|
8 | * with the License. You may obtain a copy of the License at |
---|
9 | * |
---|
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
11 | * |
---|
12 | * Unless required by applicable law or agreed to in writing, software |
---|
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
15 | * See the License for the specific language governing permissions and |
---|
16 | * limitations under the License. |
---|
17 | */ |
---|
18 | package org.apache.hadoop.io.compress; |
---|
19 | |
---|
20 | import java.util.*; |
---|
21 | |
---|
22 | import org.apache.commons.logging.Log; |
---|
23 | import org.apache.commons.logging.LogFactory; |
---|
24 | import org.apache.hadoop.conf.Configuration; |
---|
25 | import org.apache.hadoop.fs.Path; |
---|
26 | import org.apache.hadoop.util.ReflectionUtils; |
---|
27 | |
---|
28 | /** |
---|
29 | * A factory that will find the correct codec for a given filename. |
---|
30 | */ |
---|
31 | public class CompressionCodecFactory { |
---|
32 | |
---|
33 | public static final Log LOG = |
---|
34 | LogFactory.getLog(CompressionCodecFactory.class.getName()); |
---|
35 | |
---|
36 | /** |
---|
37 | * A map from the reversed filename suffixes to the codecs. |
---|
38 | * This is probably overkill, because the maps should be small, but it |
---|
39 | * automatically supports finding the longest matching suffix. |
---|
40 | */ |
---|
41 | private SortedMap<String, CompressionCodec> codecs = null; |
---|
42 | |
---|
43 | private void addCodec(CompressionCodec codec) { |
---|
44 | String suffix = codec.getDefaultExtension(); |
---|
45 | codecs.put(new StringBuffer(suffix).reverse().toString(), codec); |
---|
46 | } |
---|
47 | |
---|
48 | /** |
---|
49 | * Print the extension map out as a string. |
---|
50 | */ |
---|
51 | public String toString() { |
---|
52 | StringBuffer buf = new StringBuffer(); |
---|
53 | Iterator<Map.Entry<String, CompressionCodec>> itr = |
---|
54 | codecs.entrySet().iterator(); |
---|
55 | buf.append("{ "); |
---|
56 | if (itr.hasNext()) { |
---|
57 | Map.Entry<String, CompressionCodec> entry = itr.next(); |
---|
58 | buf.append(entry.getKey()); |
---|
59 | buf.append(": "); |
---|
60 | buf.append(entry.getValue().getClass().getName()); |
---|
61 | while (itr.hasNext()) { |
---|
62 | entry = itr.next(); |
---|
63 | buf.append(", "); |
---|
64 | buf.append(entry.getKey()); |
---|
65 | buf.append(": "); |
---|
66 | buf.append(entry.getValue().getClass().getName()); |
---|
67 | } |
---|
68 | } |
---|
69 | buf.append(" }"); |
---|
70 | return buf.toString(); |
---|
71 | } |
---|
72 | |
---|
73 | /** |
---|
74 | * Get the list of codecs listed in the configuration |
---|
75 | * @param conf the configuration to look in |
---|
76 | * @return a list of the Configuration classes or null if the attribute |
---|
77 | * was not set |
---|
78 | */ |
---|
79 | public static List<Class<? extends CompressionCodec>> getCodecClasses(Configuration conf) { |
---|
80 | String codecsString = conf.get("io.compression.codecs"); |
---|
81 | if (codecsString != null) { |
---|
82 | List<Class<? extends CompressionCodec>> result |
---|
83 | = new ArrayList<Class<? extends CompressionCodec>>(); |
---|
84 | StringTokenizer codecSplit = new StringTokenizer(codecsString, ","); |
---|
85 | while (codecSplit.hasMoreElements()) { |
---|
86 | String codecSubstring = codecSplit.nextToken(); |
---|
87 | if (codecSubstring.length() != 0) { |
---|
88 | try { |
---|
89 | Class<?> cls = conf.getClassByName(codecSubstring); |
---|
90 | if (!CompressionCodec.class.isAssignableFrom(cls)) { |
---|
91 | throw new IllegalArgumentException("Class " + codecSubstring + |
---|
92 | " is not a CompressionCodec"); |
---|
93 | } |
---|
94 | result.add(cls.asSubclass(CompressionCodec.class)); |
---|
95 | } catch (ClassNotFoundException ex) { |
---|
96 | throw new IllegalArgumentException("Compression codec " + |
---|
97 | codecSubstring + " not found.", |
---|
98 | ex); |
---|
99 | } |
---|
100 | } |
---|
101 | } |
---|
102 | return result; |
---|
103 | } else { |
---|
104 | return null; |
---|
105 | } |
---|
106 | } |
---|
107 | |
---|
108 | /** |
---|
109 | * Sets a list of codec classes in the configuration. |
---|
110 | * @param conf the configuration to modify |
---|
111 | * @param classes the list of classes to set |
---|
112 | */ |
---|
113 | public static void setCodecClasses(Configuration conf, |
---|
114 | List<Class> classes) { |
---|
115 | StringBuffer buf = new StringBuffer(); |
---|
116 | Iterator<Class> itr = classes.iterator(); |
---|
117 | if (itr.hasNext()) { |
---|
118 | Class cls = itr.next(); |
---|
119 | buf.append(cls.getName()); |
---|
120 | while(itr.hasNext()) { |
---|
121 | buf.append(','); |
---|
122 | buf.append(itr.next().getName()); |
---|
123 | } |
---|
124 | } |
---|
125 | conf.set("io.compression.codecs", buf.toString()); |
---|
126 | } |
---|
127 | |
---|
128 | /** |
---|
129 | * Find the codecs specified in the config value io.compression.codecs |
---|
130 | * and register them. Defaults to gzip and zip. |
---|
131 | */ |
---|
132 | public CompressionCodecFactory(Configuration conf) { |
---|
133 | codecs = new TreeMap<String, CompressionCodec>(); |
---|
134 | List<Class<? extends CompressionCodec>> codecClasses = getCodecClasses(conf); |
---|
135 | if (codecClasses == null) { |
---|
136 | addCodec(new GzipCodec()); |
---|
137 | addCodec(new DefaultCodec()); |
---|
138 | } else { |
---|
139 | Iterator<Class<? extends CompressionCodec>> itr = codecClasses.iterator(); |
---|
140 | while (itr.hasNext()) { |
---|
141 | CompressionCodec codec = ReflectionUtils.newInstance(itr.next(), conf); |
---|
142 | addCodec(codec); |
---|
143 | } |
---|
144 | } |
---|
145 | } |
---|
146 | |
---|
147 | /** |
---|
148 | * Find the relevant compression codec for the given file based on its |
---|
149 | * filename suffix. |
---|
150 | * @param file the filename to check |
---|
151 | * @return the codec object |
---|
152 | */ |
---|
153 | public CompressionCodec getCodec(Path file) { |
---|
154 | CompressionCodec result = null; |
---|
155 | if (codecs != null) { |
---|
156 | String filename = file.getName(); |
---|
157 | String reversedFilename = new StringBuffer(filename).reverse().toString(); |
---|
158 | SortedMap<String, CompressionCodec> subMap = |
---|
159 | codecs.headMap(reversedFilename); |
---|
160 | if (!subMap.isEmpty()) { |
---|
161 | String potentialSuffix = subMap.lastKey(); |
---|
162 | if (reversedFilename.startsWith(potentialSuffix)) { |
---|
163 | result = codecs.get(potentialSuffix); |
---|
164 | } |
---|
165 | } |
---|
166 | } |
---|
167 | return result; |
---|
168 | } |
---|
169 | |
---|
170 | /** |
---|
171 | * Removes a suffix from a filename, if it has it. |
---|
172 | * @param filename the filename to strip |
---|
173 | * @param suffix the suffix to remove |
---|
174 | * @return the shortened filename |
---|
175 | */ |
---|
176 | public static String removeSuffix(String filename, String suffix) { |
---|
177 | if (filename.endsWith(suffix)) { |
---|
178 | return filename.substring(0, filename.length() - suffix.length()); |
---|
179 | } |
---|
180 | return filename; |
---|
181 | } |
---|
182 | |
---|
183 | /** |
---|
184 | * A little test program. |
---|
185 | * @param args |
---|
186 | */ |
---|
187 | public static void main(String[] args) throws Exception { |
---|
188 | Configuration conf = new Configuration(); |
---|
189 | CompressionCodecFactory factory = new CompressionCodecFactory(conf); |
---|
190 | boolean encode = false; |
---|
191 | for(int i=0; i < args.length; ++i) { |
---|
192 | if ("-in".equals(args[i])) { |
---|
193 | encode = true; |
---|
194 | } else if ("-out".equals(args[i])) { |
---|
195 | encode = false; |
---|
196 | } else { |
---|
197 | CompressionCodec codec = factory.getCodec(new Path(args[i])); |
---|
198 | if (codec == null) { |
---|
199 | System.out.println("Codec for " + args[i] + " not found."); |
---|
200 | } else { |
---|
201 | if (encode) { |
---|
202 | CompressionOutputStream out = |
---|
203 | codec.createOutputStream(new java.io.FileOutputStream(args[i])); |
---|
204 | byte[] buffer = new byte[100]; |
---|
205 | String inFilename = removeSuffix(args[i], |
---|
206 | codec.getDefaultExtension()); |
---|
207 | java.io.InputStream in = new java.io.FileInputStream(inFilename); |
---|
208 | int len = in.read(buffer); |
---|
209 | while (len > 0) { |
---|
210 | out.write(buffer, 0, len); |
---|
211 | len = in.read(buffer); |
---|
212 | } |
---|
213 | in.close(); |
---|
214 | out.close(); |
---|
215 | } else { |
---|
216 | CompressionInputStream in = |
---|
217 | codec.createInputStream(new java.io.FileInputStream(args[i])); |
---|
218 | byte[] buffer = new byte[100]; |
---|
219 | int len = in.read(buffer); |
---|
220 | while (len > 0) { |
---|
221 | System.out.write(buffer, 0, len); |
---|
222 | len = in.read(buffer); |
---|
223 | } |
---|
224 | in.close(); |
---|
225 | } |
---|
226 | } |
---|
227 | } |
---|
228 | } |
---|
229 | } |
---|
230 | } |
---|