IDZEBRA 2.2.8
merge.c
Go to the documentation of this file.
1/* This file is part of the Zebra server.
2 Copyright (C) Index Data
3
4Zebra is free software; you can redistribute it and/or modify it under
5the terms of the GNU General Public License as published by the Free
6Software Foundation; either version 2, or (at your option) any later
7version.
8
9Zebra is distributed in the hope that it will be useful, but WITHOUT ANY
10WARRANTY; without even the implied warranty of MERCHANTABILITY or
11FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12for more details.
13
14You should have received a copy of the GNU General Public License
15along with this program; if not, write to the Free Software
16Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17
18*/
19
20#if HAVE_CONFIG_H
21#include <config.h>
22#endif
23#include <stdlib.h>
24#include <assert.h>
25#include <string.h>
26#include <stdio.h>
27#include <yaz/log.h>
28#include "isamc-p.h"
29
31 int offset; /* offset in r_buf */
32 zint block; /* block number of file (0 if none) */
33 int dirty; /* block is different from that on file */
34};
35
36#if 0
37static void opt_blocks (ISAMC is, struct isamc_merge_block *mb, int ptr,
38 int last)
39{
40 int i, no_dirty = 0;
41 for (i = 0; i<ptr; i++)
42 if (mb[i].dirty)
43 no_dirty++;
44 if (no_dirty*4 < ptr*3)
45 return;
46 /* bubble-sort it */
47 for (i = 0; i<ptr; i++)
48 {
49 int tmp, j, j_min = -1;
50 for (j = i; j<ptr; j++)
51 {
52 if (j_min < 0 || mb[j_min].block > mb[j].block)
53 j_min = j;
54 }
55 assert (j_min >= 0);
56 tmp = mb[j_min].block;
57 mb[j_min].block = mb[i].block;
58 mb[i].block = tmp;
59 mb[i].dirty = 1;
60 }
61 if (!last)
62 mb[i].dirty = 1;
63}
64#endif
65
66static void flush_blocks (ISAMC is, struct isamc_merge_block *mb, int ptr,
67 char *r_buf, zint *firstpos, int cat, int last,
68 zint *numkeys)
69{
70 int i;
71
72 for (i = 0; i<ptr; i++)
73 {
74 /* consider this block number */
75 if (!mb[i].block)
76 {
77 mb[i].block = isamc_alloc_block (is, cat);
78 mb[i].dirty = 1;
79 }
80
81 /* consider next block pointer */
82 if (last && i == ptr-1)
83 mb[i+1].block = 0;
84 else if (!mb[i+1].block)
85 {
86 mb[i+1].block = isamc_alloc_block (is, cat);
87 mb[i+1].dirty = 1;
88 mb[i].dirty = 1;
89 }
90 }
91
92 for (i = 0; i<ptr; i++)
93 {
94 char *src;
95 ISAMC_BLOCK_SIZE ssize = mb[i+1].offset - mb[i].offset;
96
97 assert (ssize);
98
99 /* skip rest if not dirty */
100 if (!mb[i].dirty)
101 {
102 assert (mb[i].block);
103 if (!*firstpos)
104 *firstpos = mb[i].block;
105 if (is->method->debug > 2)
106 yaz_log (YLOG_LOG, "isc: skip ptr=%d size=%d %d " ZINT_FORMAT,
107 i, ssize, cat, mb[i].block);
108 ++(is->files[cat].no_skip_writes);
109 continue;
110 }
111 /* write block */
112
113 if (!*firstpos)
114 {
115 *firstpos = mb[i].block;
116 src = r_buf + mb[i].offset - ISAMC_BLOCK_OFFSET_1;
117 ssize += ISAMC_BLOCK_OFFSET_1;
118
119 memcpy (src+sizeof(zint)+sizeof(ssize), numkeys, sizeof(*numkeys));
120 if (is->method->debug > 2)
121 yaz_log (YLOG_LOG, "isc: flush ptr=%d numk=" ZINT_FORMAT " size=%d nextpos="
122 ZINT_FORMAT, i, *numkeys, (int) ssize, mb[i+1].block);
123 }
124 else
125 {
126 src = r_buf + mb[i].offset - ISAMC_BLOCK_OFFSET_N;
127 ssize += ISAMC_BLOCK_OFFSET_N;
128 if (is->method->debug > 2)
129 yaz_log (YLOG_LOG, "isc: flush ptr=%d size=%d nextpos=" ZINT_FORMAT,
130 i, (int) ssize, mb[i+1].block);
131 }
132 memcpy (src, &mb[i+1].block, sizeof(zint));
133 memcpy (src+sizeof(zint), &ssize, sizeof(ssize));
134 isamc_write_block (is, cat, mb[i].block, src);
135 }
136}
137
138static int get_border (ISAMC is, struct isamc_merge_block *mb, zint ptr,
139 int cat, zint firstpos)
140{
141 /* Border set to initial fill or block size depending on
142 whether we are creating a new one or updating and old one.
143 */
144
145 int fill = mb[ptr].block ? is->method->filecat[cat].bsize :
146 is->method->filecat[cat].ifill;
147 int off = (ptr||firstpos) ? ISAMC_BLOCK_OFFSET_N : ISAMC_BLOCK_OFFSET_1;
148
149 assert (ptr < 199);
150
151 return mb[ptr].offset + fill - off;
152}
153
154void isamc_merge (ISAMC is, ISAM_P *ipos, ISAMC_I *data)
155{
156
157 char i_item[128], *i_item_ptr;
158 int i_more, i_mode, i;
159
160 ISAMC_PP pp;
161 char f_item[128], *f_item_ptr;
162 int f_more;
163 int last_dirty = 0;
164 int debug = is->method->debug;
165
166 struct isamc_merge_block mb[200];
167
168 zint firstpos = 0;
169 int cat = 0;
170 char r_item_buf[128]; /* temporary result output */
171 char *r_buf; /* block with resulting data */
172 int r_offset = 0; /* current offset in r_buf */
173 int ptr = 0; /* pointer */
174 void *r_clientData; /* encode client data */
175 zint border;
176 zint numKeys = 0;
177
178 r_clientData = (*is->method->codec.start)();
179 r_buf = is->merge_buf + 128;
180
181 pp = isamc_pp_open (is, *ipos);
182 /* read first item from file. make sure f_more indicates no boundary */
183 f_item_ptr = f_item;
184 f_more = isamc_read_item (pp, &f_item_ptr);
185 if (f_more > 0)
186 f_more = 1;
187 cat = pp->cat;
188
189 if (debug > 1)
190 yaz_log (YLOG_LOG, "isc: isamc_merge begin %d " ZINT_FORMAT, cat, pp->pos);
191
192 /* read first item from i */
193 i_item_ptr = i_item;
194 i_more = (*data->read_item)(data->clientData, &i_item_ptr, &i_mode);
195
196 mb[ptr].block = pp->pos; /* is zero if no block on disk */
197 mb[ptr].dirty = 0;
198 mb[ptr].offset = 0;
199
200 border = get_border (is, mb, ptr, cat, firstpos);
201 while (i_more || f_more)
202 {
203 char *r_item = r_item_buf;
204 int cmp;
205
206 if (f_more > 1)
207 {
208 /* block to block boundary in the original file. */
209 f_more = 1;
210 if (cat == pp->cat)
211 {
212 /* the resulting output is of the same category as the
213 the original
214 */
215 if (r_offset <= mb[ptr].offset +is->method->filecat[cat].mfill)
216 {
217 /* the resulting output block is too small/empty. Delete
218 the original (if any)
219 */
220 if (debug > 3)
221 yaz_log (YLOG_LOG, "isc: release A");
222 if (mb[ptr].block)
223 isamc_release_block (is, pp->cat, mb[ptr].block);
224 mb[ptr].block = pp->pos;
225 if (!mb[ptr].dirty)
226 mb[ptr].dirty = 1;
227 if (ptr > 0)
228 mb[ptr-1].dirty = 1;
229 }
230 else
231 {
232 /* indicate new boundary based on the original file */
233 mb[++ptr].block = pp->pos;
234 mb[ptr].dirty = last_dirty;
235 mb[ptr].offset = r_offset;
236 if (debug > 3)
237 yaz_log (YLOG_LOG, "isc: bound ptr=%d,offset=%d",
238 ptr, r_offset);
239 if (cat==is->max_cat && ptr >= is->method->max_blocks_mem)
240 {
241 /* We are dealing with block(s) of max size. Block(s)
242 except 1 will be flushed.
243 */
244 if (debug > 2)
245 yaz_log (YLOG_LOG, "isc: flush A %d sections", ptr);
246 flush_blocks (is, mb, ptr-1, r_buf, &firstpos, cat,
247 0, &pp->numKeys);
248 mb[0].block = mb[ptr-1].block;
249 mb[0].dirty = mb[ptr-1].dirty;
250 memcpy (r_buf, r_buf + mb[ptr-1].offset,
251 mb[ptr].offset - mb[ptr-1].offset);
252 mb[0].offset = 0;
253
254 mb[1].block = mb[ptr].block;
255 mb[1].dirty = mb[ptr].dirty;
256 mb[1].offset = mb[ptr].offset - mb[ptr-1].offset;
257 ptr = 1;
258 r_offset = mb[ptr].offset;
259 }
260 }
261 }
262 border = get_border (is, mb, ptr, cat, firstpos);
263 }
264 last_dirty = 0;
265 if (!f_more)
266 cmp = -1;
267 else if (!i_more)
268 cmp = 1;
269 else
270 cmp = (*is->method->compare_item)(i_item, f_item);
271 if (cmp == 0) /* insert i=f */
272 {
273 if (!i_mode) /* delete item? */
274 {
275 /* move i */
276 i_item_ptr = i_item;
277 i_more = (*data->read_item)(data->clientData, &i_item_ptr,
278 &i_mode);
279 /* is next input item the same as current except
280 for the delete flag? */
281 cmp = (*is->method->compare_item)(i_item, f_item);
282 if (!cmp && i_mode) /* delete/insert nop? */
283 {
284 /* yes! insert as if it was an insert only */
285 memcpy (r_item, i_item, i_item_ptr - i_item);
286 i_item_ptr = i_item;
287 i_more = (*data->read_item)(data->clientData, &i_item_ptr,
288 &i_mode);
289 }
290 else
291 {
292 /* no! delete the item */
293 r_item = NULL;
294 last_dirty = 1;
295 mb[ptr].dirty = 2;
296 }
297 }
298 else
299 {
300 memcpy (r_item, f_item, f_item_ptr - f_item);
301
302 /* move i */
303 i_item_ptr = i_item;
304 i_more = (*data->read_item)(data->clientData, &i_item_ptr,
305 &i_mode);
306 }
307 /* move f */
308 f_item_ptr = f_item;
309 f_more = isamc_read_item (pp, &f_item_ptr);
310 }
311 else if (cmp > 0) /* insert f */
312 {
313 memcpy (r_item, f_item, f_item_ptr - f_item);
314 /* move f */
315 f_item_ptr = f_item;
316 f_more = isamc_read_item (pp, &f_item_ptr);
317 }
318 else /* insert i */
319 {
320 if (!i_mode) /* delete item which isn't there? */
321 {
322 yaz_log (YLOG_FATAL, "Inconsistent register at offset %d",
323 r_offset);
324 abort ();
325 }
326 memcpy (r_item, i_item, i_item_ptr - i_item);
327 mb[ptr].dirty = 2;
328 last_dirty = 1;
329 /* move i */
330 i_item_ptr = i_item;
331 i_more = (*data->read_item)(data->clientData, &i_item_ptr,
332 &i_mode);
333 }
334 if (r_item) /* insert resulting item? */
335 {
336 char *r_out_ptr = r_buf + r_offset;
337 const char *src = r_item;
338 int new_offset;
339
340 (*is->method->codec.encode)(r_clientData, &r_out_ptr, &src);
341 new_offset = r_out_ptr - r_buf;
342
343 numKeys++;
344
345 if (border < new_offset && border >= r_offset)
346 {
347 if (debug > 2)
348 yaz_log (YLOG_LOG, "isc: border %d " ZINT_FORMAT,
349 ptr, border);
350 /* Max size of current block category reached ...
351 make new virtual block entry */
352 mb[++ptr].block = 0;
353 mb[ptr].dirty = 1;
354 mb[ptr].offset = r_offset;
355 if (cat == is->max_cat && ptr >= is->method->max_blocks_mem)
356 {
357 /* We are dealing with block(s) of max size. Block(s)
358 except one will be flushed. Note: the block(s) are
359 surely not the last one(s).
360 */
361 if (debug > 2)
362 yaz_log (YLOG_LOG, "isc: flush B %d sections", ptr-1);
363 flush_blocks (is, mb, ptr-1, r_buf, &firstpos, cat,
364 0, &pp->numKeys);
365 mb[0].block = mb[ptr-1].block;
366 mb[0].dirty = mb[ptr-1].dirty;
367 memcpy (r_buf, r_buf + mb[ptr-1].offset,
368 mb[ptr].offset - mb[ptr-1].offset);
369 mb[0].offset = 0;
370
371 mb[1].block = mb[ptr].block;
372 mb[1].dirty = mb[0].dirty;
373 mb[1].offset = mb[ptr].offset - mb[ptr-1].offset;
374 memcpy (r_buf + mb[1].offset, r_buf + r_offset,
375 new_offset - r_offset);
376 new_offset = (new_offset - r_offset) + mb[1].offset;
377 ptr = 1;
378 }
379 border = get_border (is, mb, ptr, cat, firstpos);
380 }
381 r_offset = new_offset;
382 }
383 if (cat < is->max_cat && ptr >= is->method->filecat[cat].mblocks)
384 {
385 /* Max number blocks in current category reached ->
386 must switch to next category (with larger block size)
387 */
388 int j = 0;
389
390 (is->files[cat].no_remap)++;
391 /* delete all original block(s) read so far */
392 for (i = 0; i < ptr; i++)
393 if (mb[i].block)
394 isamc_release_block (is, pp->cat, mb[i].block);
395 /* also delete all block to be read in the future */
396 pp->deleteFlag = 1;
397
398 /* remap block offsets */
399 assert (mb[j].offset == 0);
400 cat++;
401 mb[j].dirty = 1;
402 mb[j].block = 0;
403 mb[ptr].offset = r_offset;
404 for (i = 1; i < ptr; i++)
405 {
406 int border = is->method->filecat[cat].ifill -
408 if (debug > 3)
409 yaz_log (YLOG_LOG, "isc: remap %d border=%d", i, border);
410 if (mb[i+1].offset > border && mb[i].offset <= border)
411 {
412 if (debug > 3)
413 yaz_log (YLOG_LOG, "isc: to %d %d", j, mb[i].offset);
414 mb[++j].dirty = 1;
415 mb[j].block = 0;
416 mb[j].offset = mb[i].offset;
417 }
418 }
419 if (debug > 2)
420 yaz_log (YLOG_LOG, "isc: remap from %d to %d sections to cat %d",
421 ptr, j, cat);
422 ptr = j;
423 border = get_border (is, mb, ptr, cat, firstpos);
424 if (debug > 3)
425 yaz_log (YLOG_LOG, "isc: border=" ZINT_FORMAT " r_offset=%d",
426 border, r_offset);
427 }
428 }
429 if (mb[ptr].offset < r_offset)
430 { /* make the final boundary offset */
431 mb[++ptr].dirty = 1;
432 mb[ptr].block = 0;
433 mb[ptr].offset = r_offset;
434 }
435 else
436 { /* empty output. Release last block if any */
437 if (cat == pp->cat && mb[ptr].block)
438 {
439 if (debug > 3)
440 yaz_log (YLOG_LOG, "isc: release C");
441 isamc_release_block (is, pp->cat, mb[ptr].block);
442 mb[ptr].block = 0;
443 if (ptr > 0)
444 mb[ptr-1].dirty = 1;
445 }
446 }
447
448 if (debug > 2)
449 yaz_log (YLOG_LOG, "isc: flush C, %d sections", ptr);
450
451 if (firstpos)
452 {
453 /* we have to patch initial block with num keys if that
454 has changed */
455 if (numKeys != isamc_pp_num (pp))
456 {
457 if (debug > 2)
458 yaz_log (YLOG_LOG, "isc: patch num keys firstpos=" ZINT_FORMAT " num=" ZINT_FORMAT,
459 firstpos, numKeys);
460 bf_write (is->files[cat].bf, firstpos, ISAMC_BLOCK_OFFSET_N,
461 sizeof(numKeys), &numKeys);
462 }
463 }
464 else if (ptr > 0)
465 { /* we haven't flushed initial block yet and there surely are some
466 blocks to flush. Make first block dirty if numKeys differ */
467 if (numKeys != isamc_pp_num (pp))
468 mb[0].dirty = 1;
469 }
470 /* flush rest of block(s) in r_buf */
471 flush_blocks (is, mb, ptr, r_buf, &firstpos, cat, 1, &numKeys);
472
473 (*is->method->codec.stop)(r_clientData);
474 if (!firstpos)
475 cat = 0;
476 if (debug > 1)
477 yaz_log (YLOG_LOG, "isc: isamc_merge return %d " ZINT_FORMAT, cat, firstpos);
478 isamc_pp_close (pp);
479 *ipos = cat + firstpos * 8;
480}
481
482/*
483 * Local variables:
484 * c-basic-offset: 4
485 * c-file-style: "Stroustrup"
486 * indent-tabs-mode: nil
487 * End:
488 * vim: shiftwidth=4 tabstop=8 expandtab
489 */
490
int bf_write(BFile bf, zint no, int offset, int nbytes, const void *buf)
writes block of bytes to file (may call exit)
Definition bfile.c:232
int isamc_write_block(ISAMC is, int cat, zint pos, char *src)
Definition isamc.c:244
#define ISAMC_BLOCK_OFFSET_N
Definition isamc-p.h:94
unsigned ISAMC_BLOCK_SIZE
Definition isamc-p.h:32
zint isamc_alloc_block(ISAMC is, int cat)
Definition isamc.c:394
void isamc_release_block(ISAMC is, int cat, zint pos)
Definition isamc.c:417
#define ISAMC_BLOCK_OFFSET_1
Definition isamc-p.h:93
zint isamc_pp_num(ISAMC_PP pp)
Definition isamc.c:587
int isamc_read_item(ISAMC_PP pp, char **dst)
Definition isamc.c:522
void isamc_pp_close(ISAMC_PP pp)
Definition isamc.c:458
ISAMC_PP isamc_pp_open(ISAMC is, ISAM_P pos)
Definition isamc.c:467
zint ISAM_P
Definition isamc.h:28
static void flush_blocks(ISAMC is, struct isamc_merge_block *mb, int ptr, char *r_buf, zint *firstpos, int cat, int last, zint *numkeys)
Definition merge.c:66
void isamc_merge(ISAMC is, ISAM_P *ipos, ISAMC_I *data)
Definition merge.c:154
static int get_border(ISAMC is, struct isamc_merge_block *mb, zint ptr, int cat, zint firstpos)
Definition merge.c:138
int(* read_item)(void *clientData, char **dst, int *insertMode)
Definition isamc.h:53
void * clientData
Definition isamc.h:54
int(* compare_item)(const void *a, const void *b)
Definition isamc.h:43
ISAMC_filecat filecat
Definition isamc.h:41
ISAM_CODEC codec
Definition isamc.h:46
int max_blocks_mem
Definition isamc.h:48
int debug
Definition isamc.h:49
zint pos
Definition isamc-p.h:74
zint numKeys
Definition isamc-p.h:79
int cat
Definition isamc-p.h:73
int deleteFlag
Definition isamc-p.h:78
int no_skip_writes
Definition isamc-p.h:41
int no_remap
Definition isamc-p.h:44
BFile bf
Definition isamc-p.h:36
char * merge_buf
Definition isamc-p.h:64
int max_cat
Definition isamc-p.h:63
ISAMC_file files
Definition isamc-p.h:66
ISAMC_M * method
Definition isamc-p.h:65
void(* stop)(void *p)
Definition isam-codec.h:25
void *(* start)(void)
Definition isam-codec.h:24
void(* encode)(void *p, char **dst, const char **src)
Definition isam-codec.h:27
long zint
Zebra integer.
Definition util.h:66
#define ZINT_FORMAT
Definition util.h:72