1 | /** |
---|
2 | * Licensed to the Apache Software Foundation (ASF) under one |
---|
3 | * or more contributor license agreements. See the NOTICE file |
---|
4 | * distributed with this work for additional information |
---|
5 | * regarding copyright ownership. The ASF licenses this file |
---|
6 | * to you under the Apache License, Version 2.0 (the |
---|
7 | * "License"); you may not use this file except in compliance |
---|
8 | * with the License. You may obtain a copy of the License at |
---|
9 | * |
---|
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
11 | * |
---|
12 | * Unless required by applicable law or agreed to in writing, software |
---|
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
---|
15 | * See the License for the specific language governing permissions and |
---|
16 | * limitations under the License. |
---|
17 | */ |
---|
18 | package org.apache.hadoop.mapred; |
---|
19 | |
---|
20 | import java.io.IOException; |
---|
21 | import java.util.ArrayList; |
---|
22 | import java.util.Collections; |
---|
23 | import java.util.Comparator; |
---|
24 | import java.util.List; |
---|
25 | |
---|
26 | import org.apache.commons.logging.Log; |
---|
27 | import org.apache.commons.logging.LogFactory; |
---|
28 | import org.apache.hadoop.conf.Configuration; |
---|
29 | import org.apache.hadoop.fs.FSDataInputStream; |
---|
30 | import org.apache.hadoop.fs.ChecksumFileSystem; |
---|
31 | import org.apache.hadoop.fs.FileSystem; |
---|
32 | import org.apache.hadoop.fs.LocalDirAllocator; |
---|
33 | import org.apache.hadoop.fs.Path; |
---|
34 | import org.apache.hadoop.io.DataInputBuffer; |
---|
35 | import org.apache.hadoop.io.RawComparator; |
---|
36 | import org.apache.hadoop.io.compress.CompressionCodec; |
---|
37 | import org.apache.hadoop.mapred.IFile.Reader; |
---|
38 | import org.apache.hadoop.mapred.IFile.Writer; |
---|
39 | import org.apache.hadoop.util.PriorityQueue; |
---|
40 | import org.apache.hadoop.util.Progress; |
---|
41 | import org.apache.hadoop.util.Progressable; |
---|
42 | |
---|
43 | class Merger { |
---|
44 | private static final Log LOG = LogFactory.getLog(Merger.class); |
---|
45 | |
---|
46 | // Local directories |
---|
47 | private static LocalDirAllocator lDirAlloc = |
---|
48 | new LocalDirAllocator("mapred.local.dir"); |
---|
49 | |
---|
50 | public static <K extends Object, V extends Object> |
---|
51 | RawKeyValueIterator merge(Configuration conf, FileSystem fs, |
---|
52 | Class<K> keyClass, Class<V> valueClass, |
---|
53 | CompressionCodec codec, |
---|
54 | Path[] inputs, boolean deleteInputs, |
---|
55 | int mergeFactor, Path tmpDir, |
---|
56 | RawComparator<K> comparator, Progressable reporter, |
---|
57 | Counters.Counter readsCounter, |
---|
58 | Counters.Counter writesCounter) |
---|
59 | throws IOException { |
---|
60 | return |
---|
61 | new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator, |
---|
62 | reporter).merge(keyClass, valueClass, |
---|
63 | mergeFactor, tmpDir, |
---|
64 | readsCounter, writesCounter); |
---|
65 | } |
---|
66 | |
---|
67 | public static <K extends Object, V extends Object> |
---|
68 | RawKeyValueIterator merge(Configuration conf, FileSystem fs, |
---|
69 | Class<K> keyClass, Class<V> valueClass, |
---|
70 | CompressionCodec codec, |
---|
71 | List<Segment<K, V>> segments, |
---|
72 | int mergeFactor, Path tmpDir, |
---|
73 | RawComparator<K> comparator, Progressable reporter, |
---|
74 | Counters.Counter readsCounter, |
---|
75 | Counters.Counter writesCounter) |
---|
76 | throws IOException { |
---|
77 | return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter, |
---|
78 | false, codec).merge(keyClass, valueClass, |
---|
79 | mergeFactor, tmpDir, |
---|
80 | readsCounter, writesCounter); |
---|
81 | |
---|
82 | } |
---|
83 | |
---|
84 | public static <K extends Object, V extends Object> |
---|
85 | RawKeyValueIterator merge(Configuration conf, FileSystem fs, |
---|
86 | Class<K> keyClass, Class<V> valueClass, |
---|
87 | List<Segment<K, V>> segments, |
---|
88 | int mergeFactor, Path tmpDir, |
---|
89 | RawComparator<K> comparator, Progressable reporter, |
---|
90 | Counters.Counter readsCounter, |
---|
91 | Counters.Counter writesCounter) |
---|
92 | throws IOException { |
---|
93 | return merge(conf, fs, keyClass, valueClass, segments, mergeFactor, tmpDir, |
---|
94 | comparator, reporter, false, readsCounter, writesCounter); |
---|
95 | } |
---|
96 | |
---|
97 | public static <K extends Object, V extends Object> |
---|
98 | RawKeyValueIterator merge(Configuration conf, FileSystem fs, |
---|
99 | Class<K> keyClass, Class<V> valueClass, |
---|
100 | List<Segment<K, V>> segments, |
---|
101 | int mergeFactor, Path tmpDir, |
---|
102 | RawComparator<K> comparator, Progressable reporter, |
---|
103 | boolean sortSegments, |
---|
104 | Counters.Counter readsCounter, |
---|
105 | Counters.Counter writesCounter) |
---|
106 | throws IOException { |
---|
107 | return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter, |
---|
108 | sortSegments).merge(keyClass, valueClass, |
---|
109 | mergeFactor, tmpDir, |
---|
110 | readsCounter, writesCounter); |
---|
111 | } |
---|
112 | |
---|
113 | static <K extends Object, V extends Object> |
---|
114 | RawKeyValueIterator merge(Configuration conf, FileSystem fs, |
---|
115 | Class<K> keyClass, Class<V> valueClass, |
---|
116 | List<Segment<K, V>> segments, |
---|
117 | int mergeFactor, int inMemSegments, Path tmpDir, |
---|
118 | RawComparator<K> comparator, Progressable reporter, |
---|
119 | boolean sortSegments, |
---|
120 | Counters.Counter readsCounter, |
---|
121 | Counters.Counter writesCounter) |
---|
122 | throws IOException { |
---|
123 | return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter, |
---|
124 | sortSegments).merge(keyClass, valueClass, |
---|
125 | mergeFactor, inMemSegments, |
---|
126 | tmpDir, |
---|
127 | readsCounter, writesCounter); |
---|
128 | } |
---|
129 | |
---|
130 | |
---|
131 | static <K extends Object, V extends Object> |
---|
132 | RawKeyValueIterator merge(Configuration conf, FileSystem fs, |
---|
133 | Class<K> keyClass, Class<V> valueClass, |
---|
134 | CompressionCodec codec, |
---|
135 | List<Segment<K, V>> segments, |
---|
136 | int mergeFactor, int inMemSegments, Path tmpDir, |
---|
137 | RawComparator<K> comparator, Progressable reporter, |
---|
138 | boolean sortSegments, |
---|
139 | Counters.Counter readsCounter, |
---|
140 | Counters.Counter writesCounter) |
---|
141 | throws IOException { |
---|
142 | return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter, |
---|
143 | sortSegments, codec).merge(keyClass, valueClass, |
---|
144 | mergeFactor, inMemSegments, |
---|
145 | tmpDir, |
---|
146 | readsCounter, writesCounter); |
---|
147 | } |
---|
148 | |
---|
149 | public static <K extends Object, V extends Object> |
---|
150 | void writeFile(RawKeyValueIterator records, Writer<K, V> writer, |
---|
151 | Progressable progressable, Configuration conf) |
---|
152 | throws IOException { |
---|
153 | long progressBar = conf.getLong("mapred.merge.recordsBeforeProgress", |
---|
154 | 10000); |
---|
155 | long recordCtr = 0; |
---|
156 | while(records.next()) { |
---|
157 | writer.append(records.getKey(), records.getValue()); |
---|
158 | |
---|
159 | if (((recordCtr++) % progressBar) == 0) { |
---|
160 | progressable.progress(); |
---|
161 | } |
---|
162 | } |
---|
163 | } |
---|
164 | |
---|
165 | public static class Segment<K extends Object, V extends Object> { |
---|
166 | Reader<K, V> reader = null; |
---|
167 | DataInputBuffer key = new DataInputBuffer(); |
---|
168 | DataInputBuffer value = new DataInputBuffer(); |
---|
169 | |
---|
170 | Configuration conf = null; |
---|
171 | FileSystem fs = null; |
---|
172 | Path file = null; |
---|
173 | boolean preserve = false; |
---|
174 | CompressionCodec codec = null; |
---|
175 | long segmentOffset = 0; |
---|
176 | long segmentLength = -1; |
---|
177 | |
---|
178 | public Segment(Configuration conf, FileSystem fs, Path file, |
---|
179 | CompressionCodec codec, boolean preserve) throws IOException { |
---|
180 | this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve); |
---|
181 | } |
---|
182 | |
---|
183 | public Segment(Configuration conf, FileSystem fs, Path file, |
---|
184 | long segmentOffset, long segmentLength, CompressionCodec codec, |
---|
185 | boolean preserve) throws IOException { |
---|
186 | this.conf = conf; |
---|
187 | this.fs = fs; |
---|
188 | this.file = file; |
---|
189 | this.codec = codec; |
---|
190 | this.preserve = preserve; |
---|
191 | |
---|
192 | this.segmentOffset = segmentOffset; |
---|
193 | this.segmentLength = segmentLength; |
---|
194 | } |
---|
195 | |
---|
196 | public Segment(Reader<K, V> reader, boolean preserve) { |
---|
197 | this.reader = reader; |
---|
198 | this.preserve = preserve; |
---|
199 | |
---|
200 | this.segmentLength = reader.getLength(); |
---|
201 | } |
---|
202 | |
---|
203 | private void init(Counters.Counter readsCounter) throws IOException { |
---|
204 | if (reader == null) { |
---|
205 | FSDataInputStream in = fs.open(file); |
---|
206 | in.seek(segmentOffset); |
---|
207 | reader = new Reader<K, V>(conf, in, segmentLength, codec, readsCounter); |
---|
208 | } |
---|
209 | } |
---|
210 | |
---|
211 | DataInputBuffer getKey() { return key; } |
---|
212 | DataInputBuffer getValue() { return value; } |
---|
213 | |
---|
214 | long getLength() { |
---|
215 | return (reader == null) ? |
---|
216 | segmentLength : reader.getLength(); |
---|
217 | } |
---|
218 | |
---|
219 | boolean next() throws IOException { |
---|
220 | return reader.next(key, value); |
---|
221 | } |
---|
222 | |
---|
223 | void close() throws IOException { |
---|
224 | reader.close(); |
---|
225 | |
---|
226 | if (!preserve && fs != null) { |
---|
227 | fs.delete(file, false); |
---|
228 | } |
---|
229 | } |
---|
230 | |
---|
231 | public long getPosition() throws IOException { |
---|
232 | return reader.getPosition(); |
---|
233 | } |
---|
234 | } |
---|
235 | |
---|
236 | private static class MergeQueue<K extends Object, V extends Object> |
---|
237 | extends PriorityQueue<Segment<K, V>> implements RawKeyValueIterator { |
---|
238 | Configuration conf; |
---|
239 | FileSystem fs; |
---|
240 | CompressionCodec codec; |
---|
241 | |
---|
242 | List<Segment<K, V>> segments = new ArrayList<Segment<K,V>>(); |
---|
243 | |
---|
244 | RawComparator<K> comparator; |
---|
245 | |
---|
246 | private long totalBytesProcessed; |
---|
247 | private float progPerByte; |
---|
248 | private Progress mergeProgress = new Progress(); |
---|
249 | |
---|
250 | Progressable reporter; |
---|
251 | |
---|
252 | DataInputBuffer key; |
---|
253 | DataInputBuffer value; |
---|
254 | |
---|
255 | Segment<K, V> minSegment; |
---|
256 | Comparator<Segment<K, V>> segmentComparator = |
---|
257 | new Comparator<Segment<K, V>>() { |
---|
258 | public int compare(Segment<K, V> o1, Segment<K, V> o2) { |
---|
259 | if (o1.getLength() == o2.getLength()) { |
---|
260 | return 0; |
---|
261 | } |
---|
262 | |
---|
263 | return o1.getLength() < o2.getLength() ? -1 : 1; |
---|
264 | } |
---|
265 | }; |
---|
266 | |
---|
267 | |
---|
268 | public MergeQueue(Configuration conf, FileSystem fs, |
---|
269 | Path[] inputs, boolean deleteInputs, |
---|
270 | CompressionCodec codec, RawComparator<K> comparator, |
---|
271 | Progressable reporter) |
---|
272 | throws IOException { |
---|
273 | this.conf = conf; |
---|
274 | this.fs = fs; |
---|
275 | this.codec = codec; |
---|
276 | this.comparator = comparator; |
---|
277 | this.reporter = reporter; |
---|
278 | |
---|
279 | for (Path file : inputs) { |
---|
280 | segments.add(new Segment<K, V>(conf, fs, file, codec, !deleteInputs)); |
---|
281 | } |
---|
282 | |
---|
283 | // Sort segments on file-lengths |
---|
284 | Collections.sort(segments, segmentComparator); |
---|
285 | } |
---|
286 | |
---|
287 | public MergeQueue(Configuration conf, FileSystem fs, |
---|
288 | List<Segment<K, V>> segments, RawComparator<K> comparator, |
---|
289 | Progressable reporter) { |
---|
290 | this(conf, fs, segments, comparator, reporter, false); |
---|
291 | } |
---|
292 | |
---|
293 | public MergeQueue(Configuration conf, FileSystem fs, |
---|
294 | List<Segment<K, V>> segments, RawComparator<K> comparator, |
---|
295 | Progressable reporter, boolean sortSegments) { |
---|
296 | this.conf = conf; |
---|
297 | this.fs = fs; |
---|
298 | this.comparator = comparator; |
---|
299 | this.segments = segments; |
---|
300 | this.reporter = reporter; |
---|
301 | if (sortSegments) { |
---|
302 | Collections.sort(segments, segmentComparator); |
---|
303 | } |
---|
304 | } |
---|
305 | |
---|
306 | public MergeQueue(Configuration conf, FileSystem fs, |
---|
307 | List<Segment<K, V>> segments, RawComparator<K> comparator, |
---|
308 | Progressable reporter, boolean sortSegments, CompressionCodec codec) { |
---|
309 | this(conf, fs, segments, comparator, reporter, sortSegments); |
---|
310 | this.codec = codec; |
---|
311 | } |
---|
312 | |
---|
313 | public void close() throws IOException { |
---|
314 | Segment<K, V> segment; |
---|
315 | while((segment = pop()) != null) { |
---|
316 | segment.close(); |
---|
317 | } |
---|
318 | } |
---|
319 | |
---|
320 | public DataInputBuffer getKey() throws IOException { |
---|
321 | return key; |
---|
322 | } |
---|
323 | |
---|
324 | public DataInputBuffer getValue() throws IOException { |
---|
325 | return value; |
---|
326 | } |
---|
327 | |
---|
328 | private void adjustPriorityQueue(Segment<K, V> reader) throws IOException{ |
---|
329 | long startPos = reader.getPosition(); |
---|
330 | boolean hasNext = reader.next(); |
---|
331 | long endPos = reader.getPosition(); |
---|
332 | totalBytesProcessed += endPos - startPos; |
---|
333 | mergeProgress.set(totalBytesProcessed * progPerByte); |
---|
334 | if (hasNext) { |
---|
335 | adjustTop(); |
---|
336 | } else { |
---|
337 | pop(); |
---|
338 | reader.close(); |
---|
339 | } |
---|
340 | } |
---|
341 | |
---|
342 | public boolean next() throws IOException { |
---|
343 | if (size() == 0) |
---|
344 | return false; |
---|
345 | |
---|
346 | if (minSegment != null) { |
---|
347 | //minSegment is non-null for all invocations of next except the first |
---|
348 | //one. For the first invocation, the priority queue is ready for use |
---|
349 | //but for the subsequent invocations, first adjust the queue |
---|
350 | adjustPriorityQueue(minSegment); |
---|
351 | if (size() == 0) { |
---|
352 | minSegment = null; |
---|
353 | return false; |
---|
354 | } |
---|
355 | } |
---|
356 | minSegment = top(); |
---|
357 | |
---|
358 | key = minSegment.getKey(); |
---|
359 | value = minSegment.getValue(); |
---|
360 | |
---|
361 | return true; |
---|
362 | } |
---|
363 | |
---|
364 | @SuppressWarnings("unchecked") |
---|
365 | protected boolean lessThan(Object a, Object b) { |
---|
366 | DataInputBuffer key1 = ((Segment<K, V>)a).getKey(); |
---|
367 | DataInputBuffer key2 = ((Segment<K, V>)b).getKey(); |
---|
368 | int s1 = key1.getPosition(); |
---|
369 | int l1 = key1.getLength() - s1; |
---|
370 | int s2 = key2.getPosition(); |
---|
371 | int l2 = key2.getLength() - s2; |
---|
372 | |
---|
373 | return comparator.compare(key1.getData(), s1, l1, key2.getData(), s2, l2) < 0; |
---|
374 | } |
---|
375 | |
---|
376 | public RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass, |
---|
377 | int factor, Path tmpDir, |
---|
378 | Counters.Counter readsCounter, |
---|
379 | Counters.Counter writesCounter) |
---|
380 | throws IOException { |
---|
381 | return merge(keyClass, valueClass, factor, 0, tmpDir, |
---|
382 | readsCounter, writesCounter); |
---|
383 | } |
---|
384 | |
---|
385 | RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass, |
---|
386 | int factor, int inMem, Path tmpDir, |
---|
387 | Counters.Counter readsCounter, |
---|
388 | Counters.Counter writesCounter) |
---|
389 | throws IOException { |
---|
390 | LOG.info("Merging " + segments.size() + " sorted segments"); |
---|
391 | |
---|
392 | //create the MergeStreams from the sorted map created in the constructor |
---|
393 | //and dump the final output to a file |
---|
394 | int numSegments = segments.size(); |
---|
395 | int origFactor = factor; |
---|
396 | int passNo = 1; |
---|
397 | do { |
---|
398 | //get the factor for this pass of merge. We assume in-memory segments |
---|
399 | //are the first entries in the segment list and that the pass factor |
---|
400 | //doesn't apply to them |
---|
401 | factor = getPassFactor(factor, passNo, numSegments - inMem); |
---|
402 | if (1 == passNo) { |
---|
403 | factor += inMem; |
---|
404 | } |
---|
405 | List<Segment<K, V>> segmentsToMerge = |
---|
406 | new ArrayList<Segment<K, V>>(); |
---|
407 | int segmentsConsidered = 0; |
---|
408 | int numSegmentsToConsider = factor; |
---|
409 | long startBytes = 0; // starting bytes of segments of this merge |
---|
410 | while (true) { |
---|
411 | //extract the smallest 'factor' number of segments |
---|
412 | //Call cleanup on the empty segments (no key/value data) |
---|
413 | List<Segment<K, V>> mStream = |
---|
414 | getSegmentDescriptors(numSegmentsToConsider); |
---|
415 | for (Segment<K, V> segment : mStream) { |
---|
416 | // Initialize the segment at the last possible moment; |
---|
417 | // this helps in ensuring we don't use buffers until we need them |
---|
418 | segment.init(readsCounter); |
---|
419 | long startPos = segment.getPosition(); |
---|
420 | boolean hasNext = segment.next(); |
---|
421 | long endPos = segment.getPosition(); |
---|
422 | startBytes += endPos - startPos; |
---|
423 | |
---|
424 | if (hasNext) { |
---|
425 | segmentsToMerge.add(segment); |
---|
426 | segmentsConsidered++; |
---|
427 | } |
---|
428 | else { |
---|
429 | segment.close(); |
---|
430 | numSegments--; //we ignore this segment for the merge |
---|
431 | } |
---|
432 | } |
---|
433 | //if we have the desired number of segments |
---|
434 | //or looked at all available segments, we break |
---|
435 | if (segmentsConsidered == factor || |
---|
436 | segments.size() == 0) { |
---|
437 | break; |
---|
438 | } |
---|
439 | |
---|
440 | numSegmentsToConsider = factor - segmentsConsidered; |
---|
441 | } |
---|
442 | |
---|
443 | //feed the streams to the priority queue |
---|
444 | initialize(segmentsToMerge.size()); |
---|
445 | clear(); |
---|
446 | for (Segment<K, V> segment : segmentsToMerge) { |
---|
447 | put(segment); |
---|
448 | } |
---|
449 | |
---|
450 | //if we have lesser number of segments remaining, then just return the |
---|
451 | //iterator, else do another single level merge |
---|
452 | if (numSegments <= factor) { |
---|
453 | // Reset totalBytesProcessed to track the progress of the final merge. |
---|
454 | // This is considered the progress of the reducePhase, the 3rd phase |
---|
455 | // of reduce task. Currently totalBytesProcessed is not used in sort |
---|
456 | // phase of reduce task(i.e. when intermediate merges happen). |
---|
457 | totalBytesProcessed = startBytes; |
---|
458 | |
---|
459 | //calculate the length of the remaining segments. Required for |
---|
460 | //calculating the merge progress |
---|
461 | long totalBytes = 0; |
---|
462 | for (int i = 0; i < segmentsToMerge.size(); i++) { |
---|
463 | totalBytes += segmentsToMerge.get(i).getLength(); |
---|
464 | } |
---|
465 | if (totalBytes != 0) //being paranoid |
---|
466 | progPerByte = 1.0f / (float)totalBytes; |
---|
467 | |
---|
468 | if (totalBytes != 0) |
---|
469 | mergeProgress.set(totalBytesProcessed * progPerByte); |
---|
470 | else |
---|
471 | mergeProgress.set(1.0f); // Last pass and no segments left - we're done |
---|
472 | |
---|
473 | LOG.info("Down to the last merge-pass, with " + numSegments + |
---|
474 | " segments left of total size: " + totalBytes + " bytes"); |
---|
475 | return this; |
---|
476 | } else { |
---|
477 | LOG.info("Merging " + segmentsToMerge.size() + |
---|
478 | " intermediate segments out of a total of " + |
---|
479 | (segments.size()+segmentsToMerge.size())); |
---|
480 | |
---|
481 | //we want to spread the creation of temp files on multiple disks if |
---|
482 | //available under the space constraints |
---|
483 | long approxOutputSize = 0; |
---|
484 | for (Segment<K, V> s : segmentsToMerge) { |
---|
485 | approxOutputSize += s.getLength() + |
---|
486 | ChecksumFileSystem.getApproxChkSumLength( |
---|
487 | s.getLength()); |
---|
488 | } |
---|
489 | Path tmpFilename = |
---|
490 | new Path(tmpDir, "intermediate").suffix("." + passNo); |
---|
491 | |
---|
492 | Path outputFile = lDirAlloc.getLocalPathForWrite( |
---|
493 | tmpFilename.toString(), |
---|
494 | approxOutputSize, conf); |
---|
495 | |
---|
496 | Writer<K, V> writer = |
---|
497 | new Writer<K, V>(conf, fs, outputFile, keyClass, valueClass, codec, |
---|
498 | writesCounter); |
---|
499 | writeFile(this, writer, reporter, conf); |
---|
500 | writer.close(); |
---|
501 | |
---|
502 | //we finished one single level merge; now clean up the priority |
---|
503 | //queue |
---|
504 | this.close(); |
---|
505 | |
---|
506 | // Add the newly create segment to the list of segments to be merged |
---|
507 | Segment<K, V> tempSegment = |
---|
508 | new Segment<K, V>(conf, fs, outputFile, codec, false); |
---|
509 | segments.add(tempSegment); |
---|
510 | numSegments = segments.size(); |
---|
511 | Collections.sort(segments, segmentComparator); |
---|
512 | |
---|
513 | passNo++; |
---|
514 | } |
---|
515 | //we are worried about only the first pass merge factor. So reset the |
---|
516 | //factor to what it originally was |
---|
517 | factor = origFactor; |
---|
518 | } while(true); |
---|
519 | } |
---|
520 | |
---|
521 | /** |
---|
522 | * Determine the number of segments to merge in a given pass. Assuming more |
---|
523 | * than factor segments, the first pass should attempt to bring the total |
---|
524 | * number of segments - 1 to be divisible by the factor - 1 (each pass |
---|
525 | * takes X segments and produces 1) to minimize the number of merges. |
---|
526 | */ |
---|
527 | private int getPassFactor(int factor, int passNo, int numSegments) { |
---|
528 | if (passNo > 1 || numSegments <= factor || factor == 1) |
---|
529 | return factor; |
---|
530 | int mod = (numSegments - 1) % (factor - 1); |
---|
531 | if (mod == 0) |
---|
532 | return factor; |
---|
533 | return mod + 1; |
---|
534 | } |
---|
535 | |
---|
536 | /** Return (& remove) the requested number of segment descriptors from the |
---|
537 | * sorted map. |
---|
538 | */ |
---|
539 | private List<Segment<K, V>> getSegmentDescriptors(int numDescriptors) { |
---|
540 | if (numDescriptors > segments.size()) { |
---|
541 | List<Segment<K, V>> subList = new ArrayList<Segment<K,V>>(segments); |
---|
542 | segments.clear(); |
---|
543 | return subList; |
---|
544 | } |
---|
545 | |
---|
546 | List<Segment<K, V>> subList = |
---|
547 | new ArrayList<Segment<K,V>>(segments.subList(0, numDescriptors)); |
---|
548 | for (int i=0; i < numDescriptors; ++i) { |
---|
549 | segments.remove(0); |
---|
550 | } |
---|
551 | return subList; |
---|
552 | } |
---|
553 | |
---|
554 | public Progress getProgress() { |
---|
555 | return mergeProgress; |
---|
556 | } |
---|
557 | |
---|
558 | } |
---|
559 | } |
---|