Merge Multiple Sorted Files
Merge Multiple Sorted Files

Efficiently merge the contents from an arbitrarily long list of named files.

Notes
Very nice amortized optimization. Rather then trying to handle everything in one large merge, a "sufficiently" large number for any single case is selected. The task is the repeated as necessary. In the worst case, this adds only 7% (1/16) more merge steps then the optimimum case.

Define size of merge group (NMERGE)
Constant definition
Notes
Changing this value changes the number of files used in a single merge pass. Changes to this value can optimize IO traffic for the actual IO system.
Segment Source
 162: /* During the merge phase, the number of files to merge at once. */
 163: #define NMERGE 16
 164: 

Define size of merge buffer (mergealloc)
Constant definition
Segment Source
 169: /* Initial buffer size for in core merge buffers.  Bear in mind that
 170:    up to NMERGE * mergealloc bytes may be allocated for merge buffers. */
 171: static int mergealloc =  16 * 1024;
 172: 

Define default line length (linelength)
Constant definition
Segment Source
 173: /* Guess of average line length. */
 174: static int linelength = 30;
 175: 

Define mergefps()
Function definition
Segment Source
1288: /* Merge lines from FPS onto OFP.  NFPS cannot be greater than NMERGE.
1289:    Close FPS before returning. */
1290: 
1291: static void
1292: mergefps (FILE **fps, register int nfps, FILE *ofp)
1293: {
1294:   struct buffer buffer[NMERGE]; /* Input buffers for each file. */
1295:   struct lines lines[NMERGE];   /* Line tables for each buffer. */
1296:   struct line saved;            /* Saved line for unique check. */
1297:   int savedflag = 0;            /* True if there is a saved line. */
1298:   int savealloc;                /* Size allocated for the saved line. */
1299:   int cur[NMERGE];              /* Current line in each line table. */
1300:   int ord[NMERGE];              /* Table representing a permutation of fps,
1301:                                    such that lines[ord[0]].lines[cur[ord[0]]]
1302:                                    is the smallest line and will be next
1303:                                    output. */
1304:   register int i, j, t;
1305: 
1306: #ifdef lint  /* Suppress `used before initialized' warning.  */
1307:   savealloc = 0;
1308: #endif
1309: 
1310:   /* Allocate space for a saved line if necessary. */
1311:   if (unique)
1312:     {
1313:       savealloc = linelength;
1314:       saved.text = xmalloc (savealloc);
1315:     }
1316: 
1317:   /* Read initial lines from each input file. */
1318:   for (i = 0; i < nfps; ++i)
1319:     {
1320:       initbuf (&buffer[i], mergealloc);
1321:       /* If a file is empty, eliminate it from future consideration. */
1322:       while (i < nfps && !fillbuf (&buffer[i], fps[i]))
1323:         {
1324:           xfclose (fps[i]);
1325:           --nfps;
1326:           for (j = i; j < nfps; ++j)
1327:             fps[j] = fps[j + 1];
1328:         }
1329:       if (i == nfps)
1330:         free (buffer[i].buf);
1331:       else
1332:         {
1333:           initlines (&lines[i], mergealloc / linelength + 1,
1334:                      LINEALLOC / ((NMERGE + NMERGE) * sizeof (struct line)));
1335:           findlines (&buffer[i], &lines[i]);
1336:           cur[i] = 0;
1337:         }
1338:     }
1339: 
1340:   /* Set up the ord table according to comparisons among input lines.
1341:      Since this only reorders two items if one is strictly greater than
1342:      the other, it is stable. */
1343:   for (i = 0; i < nfps; ++i)
1344:     ord[i] = i;
1345:   for (i = 1; i < nfps; ++i)
1346:     if (compare (&lines[ord[i - 1]].lines[cur[ord[i - 1]]],
1347:                  &lines[ord[i]].lines[cur[ord[i]]]) > 0)
1348:       t = ord[i - 1], ord[i - 1] = ord[i], ord[i] = t, i = 0;
1349: 
1350:   /* Repeatedly output the smallest line until no input remains. */
1351:   while (nfps)
1352:     {
1353:       /* If uniqified output is turned on, output only the first of
1354:          an identical series of lines. */
1355:       if (unique)
1356:         {
1357:           if (savedflag && compare (&saved, &lines[ord[0]].lines[cur[ord[0]]]))
1358:             {
1359:               write_bytes (saved.text, saved.length, ofp);
1360:               putc (eolchar, ofp);
1361:               savedflag = 0;
1362:             }
1363:           if (!savedflag)
1364:             {
1365:               if (savealloc < lines[ord[0]].lines[cur[ord[0]]].length + 1)
1366:                 {
1367:                   while (savealloc < lines[ord[0]].lines[cur[ord[0]]].length + 1)
1368:                     savealloc *= 2;
1369:                   saved.text = xrealloc (saved.text, savealloc);
1370:                 }
1371:               saved.length = lines[ord[0]].lines[cur[ord[0]]].length;
1372:               memcpy (saved.text, lines[ord[0]].lines[cur[ord[0]]].text,
1373:                      saved.length + 1);
1374:               if (lines[ord[0]].lines[cur[ord[0]]].keybeg != NULL)
1375:                 {
1376:                   saved.keybeg = saved.text +
1377:                     (lines[ord[0]].lines[cur[ord[0]]].keybeg
1378:                      - lines[ord[0]].lines[cur[ord[0]]].text);
1379:                 }
1380:               if (lines[ord[0]].lines[cur[ord[0]]].keylim != NULL)
1381:                 {
1382:                   saved.keylim = saved.text +
1383:                     (lines[ord[0]].lines[cur[ord[0]]].keylim
1384:                      - lines[ord[0]].lines[cur[ord[0]]].text);
1385:                 }
1386:               savedflag = 1;
1387:             }
1388:         }
1389:       else
1390:         {
1391:           write_bytes (lines[ord[0]].lines[cur[ord[0]]].text,
1392:                        lines[ord[0]].lines[cur[ord[0]]].length, ofp);
1393:           putc (eolchar, ofp);
1394:         }
1395: 
1396:       /* Check if we need to read more lines into core. */
1397:       if (++cur[ord[0]] == lines[ord[0]].used)
1398:         if (fillbuf (&buffer[ord[0]], fps[ord[0]]))
1399:           {
1400:             findlines (&buffer[ord[0]], &lines[ord[0]]);
1401:             cur[ord[0]] = 0;
1402:           }
1403:         else
1404:           {
1405:             /* We reached EOF on fps[ord[0]]. */
1406:             for (i = 1; i < nfps; ++i)
1407:               if (ord[i] > ord[0])
1408:                 --ord[i];
1409:             --nfps;
1410:             xfclose (fps[ord[0]]);
1411:             free (buffer[ord[0]].buf);
1412:             free ((char *) lines[ord[0]].lines);
1413:             for (i = ord[0]; i < nfps; ++i)
1414:               {
1415:                 fps[i] = fps[i + 1];
1416:                 buffer[i] = buffer[i + 1];
1417:                 lines[i] = lines[i + 1];
1418:                 cur[i] = cur[i + 1];
1419:               }
1420:             for (i = 0; i < nfps; ++i)
1421:               ord[i] = ord[i + 1];
1422:             continue;
1423:           }
1424: 
1425:       /* The new line just read in may be larger than other lines
1426:          already in core; push it back in the queue until we encounter
1427:          a line larger than it. */
1428:       for (i = 1; i < nfps; ++i)
1429:         {
1430:           t = compare (&lines[ord[0]].lines[cur[ord[0]]],
1431:                        &lines[ord[i]].lines[cur[ord[i]]]);
1432:           if (!t)
1433:             t = ord[0] - ord[i];
1434:           if (t < 0)
1435:             break;
1436:         }
1437:       t = ord[0];
1438:       for (j = 1; j < i; ++j)
1439:         ord[j - 1] = ord[j];
1440:       ord[i - 1] = t;
1441:     }
1442: 
1443:   if (unique && savedflag)
1444:     {
1445:       write_bytes (saved.text, saved.length, ofp);
1446:       putc (eolchar, ofp);
1447:       free (saved.text);
1448:     }
1449: }
1450: 

Define merge()
Function definition
Segment Source
1512: /* Merge NFILES FILES onto OFP. */
1513: 
1514: static void
1515: merge (char **files, int nfiles, FILE *ofp)
1516: {
1517:   int i, j, t;
1518:   char *temp;
1519:   FILE *fps[NMERGE], *tfp;
1520: 
1521:   while (nfiles > NMERGE)
1522:     {
1523:       t = 0;
1524:       for (i = 0; i < nfiles / NMERGE; ++i)
1525:         {
1526:           for (j = 0; j < NMERGE; ++j)
1527:             fps[j] = xfopen (files[i * NMERGE + j], "r");
1528:           tfp = xtmpfopen (temp = tempname ());
1529:           mergefps (fps, NMERGE, tfp);
1530:           xfclose (tfp);
1531:           for (j = 0; j < NMERGE; ++j)
1532:             zaptemp (files[i * NMERGE + j]);
1533:           files[t++] = temp;
1534:         }
1535:       for (j = 0; j < nfiles % NMERGE; ++j)
1536:         fps[j] = xfopen (files[i * NMERGE + j], "r");
1537:       tfp = xtmpfopen (temp = tempname ());
1538:       mergefps (fps, nfiles % NMERGE, tfp);
1539:       xfclose (tfp);
1540:       for (j = 0; j < nfiles % NMERGE; ++j)
1541:         zaptemp (files[i * NMERGE + j]);
1542:       files[t++] = temp;
1543:       nfiles = t;
1544:     }
1545: 
1546:   for (i = 0; i < nfiles; ++i)
1547:     fps[i] = xfopen (files[i], "r");
1548:   mergefps (fps, i, ofp);
1549:   for (i = 0; i < nfiles; ++i)
1550:     zaptemp (files[i]);
1551: }
1552: