000001 /* 000002 ** 2011-07-09 000003 ** 000004 ** The author disclaims copyright to this source code. In place of 000005 ** a legal notice, here is a blessing: 000006 ** 000007 ** May you do good and not evil. 000008 ** May you find forgiveness for yourself and forgive others. 000009 ** May you share freely, never taking more than you give. 000010 ** 000011 ************************************************************************* 000012 ** This file contains code for the VdbeSorter object, used in concert with 000013 ** a VdbeCursor to sort large numbers of keys for CREATE INDEX statements 000014 ** or by SELECT statements with ORDER BY clauses that cannot be satisfied 000015 ** using indexes and without LIMIT clauses. 000016 ** 000017 ** The VdbeSorter object implements a multi-threaded external merge sort 000018 ** algorithm that is efficient even if the number of elements being sorted 000019 ** exceeds the available memory. 000020 ** 000021 ** Here is the (internal, non-API) interface between this module and the 000022 ** rest of the SQLite system: 000023 ** 000024 ** sqlite3VdbeSorterInit() Create a new VdbeSorter object. 000025 ** 000026 ** sqlite3VdbeSorterWrite() Add a single new row to the VdbeSorter 000027 ** object. The row is a binary blob in the 000028 ** OP_MakeRecord format that contains both 000029 ** the ORDER BY key columns and result columns 000030 ** in the case of a SELECT w/ ORDER BY, or 000031 ** the complete record for an index entry 000032 ** in the case of a CREATE INDEX. 000033 ** 000034 ** sqlite3VdbeSorterRewind() Sort all content previously added. 000035 ** Position the read cursor on the 000036 ** first sorted element. 000037 ** 000038 ** sqlite3VdbeSorterNext() Advance the read cursor to the next sorted 000039 ** element. 000040 ** 000041 ** sqlite3VdbeSorterRowkey() Return the complete binary blob for the 000042 ** row currently under the read cursor. 000043 ** 000044 ** sqlite3VdbeSorterCompare() Compare the binary blob for the row 000045 ** currently under the read cursor against 000046 ** another binary blob X and report if 000047 ** X is strictly less than the read cursor. 000048 ** Used to enforce uniqueness in a 000049 ** CREATE UNIQUE INDEX statement. 000050 ** 000051 ** sqlite3VdbeSorterClose() Close the VdbeSorter object and reclaim 000052 ** all resources. 000053 ** 000054 ** sqlite3VdbeSorterReset() Refurbish the VdbeSorter for reuse. This 000055 ** is like Close() followed by Init() only 000056 ** much faster. 000057 ** 000058 ** The interfaces above must be called in a particular order. Write() can 000059 ** only occur in between Init()/Reset() and Rewind(). Next(), Rowkey(), and 000060 ** Compare() can only occur in between Rewind() and Close()/Reset(). i.e. 000061 ** 000062 ** Init() 000063 ** for each record: Write() 000064 ** Rewind() 000065 ** Rowkey()/Compare() 000066 ** Next() 000067 ** Close() 000068 ** 000069 ** Algorithm: 000070 ** 000071 ** Records passed to the sorter via calls to Write() are initially held 000072 ** unsorted in main memory. Assuming the amount of memory used never exceeds 000073 ** a threshold, when Rewind() is called the set of records is sorted using 000074 ** an in-memory merge sort. In this case, no temporary files are required 000075 ** and subsequent calls to Rowkey(), Next() and Compare() read records 000076 ** directly from main memory. 000077 ** 000078 ** If the amount of space used to store records in main memory exceeds the 000079 ** threshold, then the set of records currently in memory are sorted and 000080 ** written to a temporary file in "Packed Memory Array" (PMA) format. 000081 ** A PMA created at this point is known as a "level-0 PMA". Higher levels 000082 ** of PMAs may be created by merging existing PMAs together - for example 000083 ** merging two or more level-0 PMAs together creates a level-1 PMA. 000084 ** 000085 ** The threshold for the amount of main memory to use before flushing 000086 ** records to a PMA is roughly the same as the limit configured for the 000087 ** page-cache of the main database. Specifically, the threshold is set to 000088 ** the value returned by "PRAGMA main.page_size" multipled by 000089 ** that returned by "PRAGMA main.cache_size", in bytes. 000090 ** 000091 ** If the sorter is running in single-threaded mode, then all PMAs generated 000092 ** are appended to a single temporary file. Or, if the sorter is running in 000093 ** multi-threaded mode then up to (N+1) temporary files may be opened, where 000094 ** N is the configured number of worker threads. In this case, instead of 000095 ** sorting the records and writing the PMA to a temporary file itself, the 000096 ** calling thread usually launches a worker thread to do so. Except, if 000097 ** there are already N worker threads running, the main thread does the work 000098 ** itself. 000099 ** 000100 ** The sorter is running in multi-threaded mode if (a) the library was built 000101 ** with pre-processor symbol SQLITE_MAX_WORKER_THREADS set to a value greater 000102 ** than zero, and (b) worker threads have been enabled at runtime by calling 000103 ** "PRAGMA threads=N" with some value of N greater than 0. 000104 ** 000105 ** When Rewind() is called, any data remaining in memory is flushed to a 000106 ** final PMA. So at this point the data is stored in some number of sorted 000107 ** PMAs within temporary files on disk. 000108 ** 000109 ** If there are fewer than SORTER_MAX_MERGE_COUNT PMAs in total and the 000110 ** sorter is running in single-threaded mode, then these PMAs are merged 000111 ** incrementally as keys are retreived from the sorter by the VDBE. The 000112 ** MergeEngine object, described in further detail below, performs this 000113 ** merge. 000114 ** 000115 ** Or, if running in multi-threaded mode, then a background thread is 000116 ** launched to merge the existing PMAs. Once the background thread has 000117 ** merged T bytes of data into a single sorted PMA, the main thread 000118 ** begins reading keys from that PMA while the background thread proceeds 000119 ** with merging the next T bytes of data. And so on. 000120 ** 000121 ** Parameter T is set to half the value of the memory threshold used 000122 ** by Write() above to determine when to create a new PMA. 000123 ** 000124 ** If there are more than SORTER_MAX_MERGE_COUNT PMAs in total when 000125 ** Rewind() is called, then a hierarchy of incremental-merges is used. 000126 ** First, T bytes of data from the first SORTER_MAX_MERGE_COUNT PMAs on 000127 ** disk are merged together. Then T bytes of data from the second set, and 000128 ** so on, such that no operation ever merges more than SORTER_MAX_MERGE_COUNT 000129 ** PMAs at a time. This done is to improve locality. 000130 ** 000131 ** If running in multi-threaded mode and there are more than 000132 ** SORTER_MAX_MERGE_COUNT PMAs on disk when Rewind() is called, then more 000133 ** than one background thread may be created. Specifically, there may be 000134 ** one background thread for each temporary file on disk, and one background 000135 ** thread to merge the output of each of the others to a single PMA for 000136 ** the main thread to read from. 000137 */ 000138 #include "sqliteInt.h" 000139 #include "vdbeInt.h" 000140 000141 /* 000142 ** If SQLITE_DEBUG_SORTER_THREADS is defined, this module outputs various 000143 ** messages to stderr that may be helpful in understanding the performance 000144 ** characteristics of the sorter in multi-threaded mode. 000145 */ 000146 #if 0 000147 # define SQLITE_DEBUG_SORTER_THREADS 1 000148 #endif 000149 000150 /* 000151 ** Hard-coded maximum amount of data to accumulate in memory before flushing 000152 ** to a level 0 PMA. The purpose of this limit is to prevent various integer 000153 ** overflows. 512MiB. 000154 */ 000155 #define SQLITE_MAX_PMASZ (1<<29) 000156 000157 /* 000158 ** Private objects used by the sorter 000159 */ 000160 typedef struct MergeEngine MergeEngine; /* Merge PMAs together */ 000161 typedef struct PmaReader PmaReader; /* Incrementally read one PMA */ 000162 typedef struct PmaWriter PmaWriter; /* Incrementally write one PMA */ 000163 typedef struct SorterRecord SorterRecord; /* A record being sorted */ 000164 typedef struct SortSubtask SortSubtask; /* A sub-task in the sort process */ 000165 typedef struct SorterFile SorterFile; /* Temporary file object wrapper */ 000166 typedef struct SorterList SorterList; /* In-memory list of records */ 000167 typedef struct IncrMerger IncrMerger; /* Read & merge multiple PMAs */ 000168 000169 /* 000170 ** A container for a temp file handle and the current amount of data 000171 ** stored in the file. 000172 */ 000173 struct SorterFile { 000174 sqlite3_file *pFd; /* File handle */ 000175 i64 iEof; /* Bytes of data stored in pFd */ 000176 }; 000177 000178 /* 000179 ** An in-memory list of objects to be sorted. 000180 ** 000181 ** If aMemory==0 then each object is allocated separately and the objects 000182 ** are connected using SorterRecord.u.pNext. If aMemory!=0 then all objects 000183 ** are stored in the aMemory[] bulk memory, one right after the other, and 000184 ** are connected using SorterRecord.u.iNext. 000185 */ 000186 struct SorterList { 000187 SorterRecord *pList; /* Linked list of records */ 000188 u8 *aMemory; /* If non-NULL, bulk memory to hold pList */ 000189 int szPMA; /* Size of pList as PMA in bytes */ 000190 }; 000191 000192 /* 000193 ** The MergeEngine object is used to combine two or more smaller PMAs into 000194 ** one big PMA using a merge operation. Separate PMAs all need to be 000195 ** combined into one big PMA in order to be able to step through the sorted 000196 ** records in order. 000197 ** 000198 ** The aReadr[] array contains a PmaReader object for each of the PMAs being 000199 ** merged. An aReadr[] object either points to a valid key or else is at EOF. 000200 ** ("EOF" means "End Of File". When aReadr[] is at EOF there is no more data.) 000201 ** For the purposes of the paragraphs below, we assume that the array is 000202 ** actually N elements in size, where N is the smallest power of 2 greater 000203 ** to or equal to the number of PMAs being merged. The extra aReadr[] elements 000204 ** are treated as if they are empty (always at EOF). 000205 ** 000206 ** The aTree[] array is also N elements in size. The value of N is stored in 000207 ** the MergeEngine.nTree variable. 000208 ** 000209 ** The final (N/2) elements of aTree[] contain the results of comparing 000210 ** pairs of PMA keys together. Element i contains the result of 000211 ** comparing aReadr[2*i-N] and aReadr[2*i-N+1]. Whichever key is smaller, the 000212 ** aTree element is set to the index of it. 000213 ** 000214 ** For the purposes of this comparison, EOF is considered greater than any 000215 ** other key value. If the keys are equal (only possible with two EOF 000216 ** values), it doesn't matter which index is stored. 000217 ** 000218 ** The (N/4) elements of aTree[] that precede the final (N/2) described 000219 ** above contains the index of the smallest of each block of 4 PmaReaders 000220 ** And so on. So that aTree[1] contains the index of the PmaReader that 000221 ** currently points to the smallest key value. aTree[0] is unused. 000222 ** 000223 ** Example: 000224 ** 000225 ** aReadr[0] -> Banana 000226 ** aReadr[1] -> Feijoa 000227 ** aReadr[2] -> Elderberry 000228 ** aReadr[3] -> Currant 000229 ** aReadr[4] -> Grapefruit 000230 ** aReadr[5] -> Apple 000231 ** aReadr[6] -> Durian 000232 ** aReadr[7] -> EOF 000233 ** 000234 ** aTree[] = { X, 5 0, 5 0, 3, 5, 6 } 000235 ** 000236 ** The current element is "Apple" (the value of the key indicated by 000237 ** PmaReader 5). When the Next() operation is invoked, PmaReader 5 will 000238 ** be advanced to the next key in its segment. Say the next key is 000239 ** "Eggplant": 000240 ** 000241 ** aReadr[5] -> Eggplant 000242 ** 000243 ** The contents of aTree[] are updated first by comparing the new PmaReader 000244 ** 5 key to the current key of PmaReader 4 (still "Grapefruit"). The PmaReader 000245 ** 5 value is still smaller, so aTree[6] is set to 5. And so on up the tree. 000246 ** The value of PmaReader 6 - "Durian" - is now smaller than that of PmaReader 000247 ** 5, so aTree[3] is set to 6. Key 0 is smaller than key 6 (Banana<Durian), 000248 ** so the value written into element 1 of the array is 0. As follows: 000249 ** 000250 ** aTree[] = { X, 0 0, 6 0, 3, 5, 6 } 000251 ** 000252 ** In other words, each time we advance to the next sorter element, log2(N) 000253 ** key comparison operations are required, where N is the number of segments 000254 ** being merged (rounded up to the next power of 2). 000255 */ 000256 struct MergeEngine { 000257 int nTree; /* Used size of aTree/aReadr (power of 2) */ 000258 SortSubtask *pTask; /* Used by this thread only */ 000259 int *aTree; /* Current state of incremental merge */ 000260 PmaReader *aReadr; /* Array of PmaReaders to merge data from */ 000261 }; 000262 000263 /* 000264 ** This object represents a single thread of control in a sort operation. 000265 ** Exactly VdbeSorter.nTask instances of this object are allocated 000266 ** as part of each VdbeSorter object. Instances are never allocated any 000267 ** other way. VdbeSorter.nTask is set to the number of worker threads allowed 000268 ** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread). Thus for 000269 ** single-threaded operation, there is exactly one instance of this object 000270 ** and for multi-threaded operation there are two or more instances. 000271 ** 000272 ** Essentially, this structure contains all those fields of the VdbeSorter 000273 ** structure for which each thread requires a separate instance. For example, 000274 ** each thread requries its own UnpackedRecord object to unpack records in 000275 ** as part of comparison operations. 000276 ** 000277 ** Before a background thread is launched, variable bDone is set to 0. Then, 000278 ** right before it exits, the thread itself sets bDone to 1. This is used for 000279 ** two purposes: 000280 ** 000281 ** 1. When flushing the contents of memory to a level-0 PMA on disk, to 000282 ** attempt to select a SortSubtask for which there is not already an 000283 ** active background thread (since doing so causes the main thread 000284 ** to block until it finishes). 000285 ** 000286 ** 2. If SQLITE_DEBUG_SORTER_THREADS is defined, to determine if a call 000287 ** to sqlite3ThreadJoin() is likely to block. Cases that are likely to 000288 ** block provoke debugging output. 000289 ** 000290 ** In both cases, the effects of the main thread seeing (bDone==0) even 000291 ** after the thread has finished are not dire. So we don't worry about 000292 ** memory barriers and such here. 000293 */ 000294 typedef int (*SorterCompare)(SortSubtask*,int*,const void*,int,const void*,int); 000295 struct SortSubtask { 000296 SQLiteThread *pThread; /* Background thread, if any */ 000297 int bDone; /* Set if thread is finished but not joined */ 000298 VdbeSorter *pSorter; /* Sorter that owns this sub-task */ 000299 UnpackedRecord *pUnpacked; /* Space to unpack a record */ 000300 SorterList list; /* List for thread to write to a PMA */ 000301 int nPMA; /* Number of PMAs currently in file */ 000302 SorterCompare xCompare; /* Compare function to use */ 000303 SorterFile file; /* Temp file for level-0 PMAs */ 000304 SorterFile file2; /* Space for other PMAs */ 000305 }; 000306 000307 000308 /* 000309 ** Main sorter structure. A single instance of this is allocated for each 000310 ** sorter cursor created by the VDBE. 000311 ** 000312 ** mxKeysize: 000313 ** As records are added to the sorter by calls to sqlite3VdbeSorterWrite(), 000314 ** this variable is updated so as to be set to the size on disk of the 000315 ** largest record in the sorter. 000316 */ 000317 struct VdbeSorter { 000318 int mnPmaSize; /* Minimum PMA size, in bytes */ 000319 int mxPmaSize; /* Maximum PMA size, in bytes. 0==no limit */ 000320 int mxKeysize; /* Largest serialized key seen so far */ 000321 int pgsz; /* Main database page size */ 000322 PmaReader *pReader; /* Readr data from here after Rewind() */ 000323 MergeEngine *pMerger; /* Or here, if bUseThreads==0 */ 000324 sqlite3 *db; /* Database connection */ 000325 KeyInfo *pKeyInfo; /* How to compare records */ 000326 UnpackedRecord *pUnpacked; /* Used by VdbeSorterCompare() */ 000327 SorterList list; /* List of in-memory records */ 000328 int iMemory; /* Offset of free space in list.aMemory */ 000329 int nMemory; /* Size of list.aMemory allocation in bytes */ 000330 u8 bUsePMA; /* True if one or more PMAs created */ 000331 u8 bUseThreads; /* True to use background threads */ 000332 u8 iPrev; /* Previous thread used to flush PMA */ 000333 u8 nTask; /* Size of aTask[] array */ 000334 u8 typeMask; 000335 SortSubtask aTask[1]; /* One or more subtasks */ 000336 }; 000337 000338 #define SORTER_TYPE_INTEGER 0x01 000339 #define SORTER_TYPE_TEXT 0x02 000340 000341 /* 000342 ** An instance of the following object is used to read records out of a 000343 ** PMA, in sorted order. The next key to be read is cached in nKey/aKey. 000344 ** aKey might point into aMap or into aBuffer. If neither of those locations 000345 ** contain a contiguous representation of the key, then aAlloc is allocated 000346 ** and the key is copied into aAlloc and aKey is made to poitn to aAlloc. 000347 ** 000348 ** pFd==0 at EOF. 000349 */ 000350 struct PmaReader { 000351 i64 iReadOff; /* Current read offset */ 000352 i64 iEof; /* 1 byte past EOF for this PmaReader */ 000353 int nAlloc; /* Bytes of space at aAlloc */ 000354 int nKey; /* Number of bytes in key */ 000355 sqlite3_file *pFd; /* File handle we are reading from */ 000356 u8 *aAlloc; /* Space for aKey if aBuffer and pMap wont work */ 000357 u8 *aKey; /* Pointer to current key */ 000358 u8 *aBuffer; /* Current read buffer */ 000359 int nBuffer; /* Size of read buffer in bytes */ 000360 u8 *aMap; /* Pointer to mapping of entire file */ 000361 IncrMerger *pIncr; /* Incremental merger */ 000362 }; 000363 000364 /* 000365 ** Normally, a PmaReader object iterates through an existing PMA stored 000366 ** within a temp file. However, if the PmaReader.pIncr variable points to 000367 ** an object of the following type, it may be used to iterate/merge through 000368 ** multiple PMAs simultaneously. 000369 ** 000370 ** There are two types of IncrMerger object - single (bUseThread==0) and 000371 ** multi-threaded (bUseThread==1). 000372 ** 000373 ** A multi-threaded IncrMerger object uses two temporary files - aFile[0] 000374 ** and aFile[1]. Neither file is allowed to grow to more than mxSz bytes in 000375 ** size. When the IncrMerger is initialized, it reads enough data from 000376 ** pMerger to populate aFile[0]. It then sets variables within the 000377 ** corresponding PmaReader object to read from that file and kicks off 000378 ** a background thread to populate aFile[1] with the next mxSz bytes of 000379 ** sorted record data from pMerger. 000380 ** 000381 ** When the PmaReader reaches the end of aFile[0], it blocks until the 000382 ** background thread has finished populating aFile[1]. It then exchanges 000383 ** the contents of the aFile[0] and aFile[1] variables within this structure, 000384 ** sets the PmaReader fields to read from the new aFile[0] and kicks off 000385 ** another background thread to populate the new aFile[1]. And so on, until 000386 ** the contents of pMerger are exhausted. 000387 ** 000388 ** A single-threaded IncrMerger does not open any temporary files of its 000389 ** own. Instead, it has exclusive access to mxSz bytes of space beginning 000390 ** at offset iStartOff of file pTask->file2. And instead of using a 000391 ** background thread to prepare data for the PmaReader, with a single 000392 ** threaded IncrMerger the allocate part of pTask->file2 is "refilled" with 000393 ** keys from pMerger by the calling thread whenever the PmaReader runs out 000394 ** of data. 000395 */ 000396 struct IncrMerger { 000397 SortSubtask *pTask; /* Task that owns this merger */ 000398 MergeEngine *pMerger; /* Merge engine thread reads data from */ 000399 i64 iStartOff; /* Offset to start writing file at */ 000400 int mxSz; /* Maximum bytes of data to store */ 000401 int bEof; /* Set to true when merge is finished */ 000402 int bUseThread; /* True to use a bg thread for this object */ 000403 SorterFile aFile[2]; /* aFile[0] for reading, [1] for writing */ 000404 }; 000405 000406 /* 000407 ** An instance of this object is used for writing a PMA. 000408 ** 000409 ** The PMA is written one record at a time. Each record is of an arbitrary 000410 ** size. But I/O is more efficient if it occurs in page-sized blocks where 000411 ** each block is aligned on a page boundary. This object caches writes to 000412 ** the PMA so that aligned, page-size blocks are written. 000413 */ 000414 struct PmaWriter { 000415 int eFWErr; /* Non-zero if in an error state */ 000416 u8 *aBuffer; /* Pointer to write buffer */ 000417 int nBuffer; /* Size of write buffer in bytes */ 000418 int iBufStart; /* First byte of buffer to write */ 000419 int iBufEnd; /* Last byte of buffer to write */ 000420 i64 iWriteOff; /* Offset of start of buffer in file */ 000421 sqlite3_file *pFd; /* File handle to write to */ 000422 }; 000423 000424 /* 000425 ** This object is the header on a single record while that record is being 000426 ** held in memory and prior to being written out as part of a PMA. 000427 ** 000428 ** How the linked list is connected depends on how memory is being managed 000429 ** by this module. If using a separate allocation for each in-memory record 000430 ** (VdbeSorter.list.aMemory==0), then the list is always connected using the 000431 ** SorterRecord.u.pNext pointers. 000432 ** 000433 ** Or, if using the single large allocation method (VdbeSorter.list.aMemory!=0), 000434 ** then while records are being accumulated the list is linked using the 000435 ** SorterRecord.u.iNext offset. This is because the aMemory[] array may 000436 ** be sqlite3Realloc()ed while records are being accumulated. Once the VM 000437 ** has finished passing records to the sorter, or when the in-memory buffer 000438 ** is full, the list is sorted. As part of the sorting process, it is 000439 ** converted to use the SorterRecord.u.pNext pointers. See function 000440 ** vdbeSorterSort() for details. 000441 */ 000442 struct SorterRecord { 000443 int nVal; /* Size of the record in bytes */ 000444 union { 000445 SorterRecord *pNext; /* Pointer to next record in list */ 000446 int iNext; /* Offset within aMemory of next record */ 000447 } u; 000448 /* The data for the record immediately follows this header */ 000449 }; 000450 000451 /* Return a pointer to the buffer containing the record data for SorterRecord 000452 ** object p. Should be used as if: 000453 ** 000454 ** void *SRVAL(SorterRecord *p) { return (void*)&p[1]; } 000455 */ 000456 #define SRVAL(p) ((void*)((SorterRecord*)(p) + 1)) 000457 000458 000459 /* Maximum number of PMAs that a single MergeEngine can merge */ 000460 #define SORTER_MAX_MERGE_COUNT 16 000461 000462 static int vdbeIncrSwap(IncrMerger*); 000463 static void vdbeIncrFree(IncrMerger *); 000464 000465 /* 000466 ** Free all memory belonging to the PmaReader object passed as the 000467 ** argument. All structure fields are set to zero before returning. 000468 */ 000469 static void vdbePmaReaderClear(PmaReader *pReadr){ 000470 sqlite3_free(pReadr->aAlloc); 000471 sqlite3_free(pReadr->aBuffer); 000472 if( pReadr->aMap ) sqlite3OsUnfetch(pReadr->pFd, 0, pReadr->aMap); 000473 vdbeIncrFree(pReadr->pIncr); 000474 memset(pReadr, 0, sizeof(PmaReader)); 000475 } 000476 000477 /* 000478 ** Read the next nByte bytes of data from the PMA p. 000479 ** If successful, set *ppOut to point to a buffer containing the data 000480 ** and return SQLITE_OK. Otherwise, if an error occurs, return an SQLite 000481 ** error code. 000482 ** 000483 ** The buffer returned in *ppOut is only valid until the 000484 ** next call to this function. 000485 */ 000486 static int vdbePmaReadBlob( 000487 PmaReader *p, /* PmaReader from which to take the blob */ 000488 int nByte, /* Bytes of data to read */ 000489 u8 **ppOut /* OUT: Pointer to buffer containing data */ 000490 ){ 000491 int iBuf; /* Offset within buffer to read from */ 000492 int nAvail; /* Bytes of data available in buffer */ 000493 000494 if( p->aMap ){ 000495 *ppOut = &p->aMap[p->iReadOff]; 000496 p->iReadOff += nByte; 000497 return SQLITE_OK; 000498 } 000499 000500 assert( p->aBuffer ); 000501 000502 /* If there is no more data to be read from the buffer, read the next 000503 ** p->nBuffer bytes of data from the file into it. Or, if there are less 000504 ** than p->nBuffer bytes remaining in the PMA, read all remaining data. */ 000505 iBuf = p->iReadOff % p->nBuffer; 000506 if( iBuf==0 ){ 000507 int nRead; /* Bytes to read from disk */ 000508 int rc; /* sqlite3OsRead() return code */ 000509 000510 /* Determine how many bytes of data to read. */ 000511 if( (p->iEof - p->iReadOff) > (i64)p->nBuffer ){ 000512 nRead = p->nBuffer; 000513 }else{ 000514 nRead = (int)(p->iEof - p->iReadOff); 000515 } 000516 assert( nRead>0 ); 000517 000518 /* Readr data from the file. Return early if an error occurs. */ 000519 rc = sqlite3OsRead(p->pFd, p->aBuffer, nRead, p->iReadOff); 000520 assert( rc!=SQLITE_IOERR_SHORT_READ ); 000521 if( rc!=SQLITE_OK ) return rc; 000522 } 000523 nAvail = p->nBuffer - iBuf; 000524 000525 if( nByte<=nAvail ){ 000526 /* The requested data is available in the in-memory buffer. In this 000527 ** case there is no need to make a copy of the data, just return a 000528 ** pointer into the buffer to the caller. */ 000529 *ppOut = &p->aBuffer[iBuf]; 000530 p->iReadOff += nByte; 000531 }else{ 000532 /* The requested data is not all available in the in-memory buffer. 000533 ** In this case, allocate space at p->aAlloc[] to copy the requested 000534 ** range into. Then return a copy of pointer p->aAlloc to the caller. */ 000535 int nRem; /* Bytes remaining to copy */ 000536 000537 /* Extend the p->aAlloc[] allocation if required. */ 000538 if( p->nAlloc<nByte ){ 000539 u8 *aNew; 000540 int nNew = MAX(128, p->nAlloc*2); 000541 while( nByte>nNew ) nNew = nNew*2; 000542 aNew = sqlite3Realloc(p->aAlloc, nNew); 000543 if( !aNew ) return SQLITE_NOMEM_BKPT; 000544 p->nAlloc = nNew; 000545 p->aAlloc = aNew; 000546 } 000547 000548 /* Copy as much data as is available in the buffer into the start of 000549 ** p->aAlloc[]. */ 000550 memcpy(p->aAlloc, &p->aBuffer[iBuf], nAvail); 000551 p->iReadOff += nAvail; 000552 nRem = nByte - nAvail; 000553 000554 /* The following loop copies up to p->nBuffer bytes per iteration into 000555 ** the p->aAlloc[] buffer. */ 000556 while( nRem>0 ){ 000557 int rc; /* vdbePmaReadBlob() return code */ 000558 int nCopy; /* Number of bytes to copy */ 000559 u8 *aNext; /* Pointer to buffer to copy data from */ 000560 000561 nCopy = nRem; 000562 if( nRem>p->nBuffer ) nCopy = p->nBuffer; 000563 rc = vdbePmaReadBlob(p, nCopy, &aNext); 000564 if( rc!=SQLITE_OK ) return rc; 000565 assert( aNext!=p->aAlloc ); 000566 memcpy(&p->aAlloc[nByte - nRem], aNext, nCopy); 000567 nRem -= nCopy; 000568 } 000569 000570 *ppOut = p->aAlloc; 000571 } 000572 000573 return SQLITE_OK; 000574 } 000575 000576 /* 000577 ** Read a varint from the stream of data accessed by p. Set *pnOut to 000578 ** the value read. 000579 */ 000580 static int vdbePmaReadVarint(PmaReader *p, u64 *pnOut){ 000581 int iBuf; 000582 000583 if( p->aMap ){ 000584 p->iReadOff += sqlite3GetVarint(&p->aMap[p->iReadOff], pnOut); 000585 }else{ 000586 iBuf = p->iReadOff % p->nBuffer; 000587 if( iBuf && (p->nBuffer-iBuf)>=9 ){ 000588 p->iReadOff += sqlite3GetVarint(&p->aBuffer[iBuf], pnOut); 000589 }else{ 000590 u8 aVarint[16], *a; 000591 int i = 0, rc; 000592 do{ 000593 rc = vdbePmaReadBlob(p, 1, &a); 000594 if( rc ) return rc; 000595 aVarint[(i++)&0xf] = a[0]; 000596 }while( (a[0]&0x80)!=0 ); 000597 sqlite3GetVarint(aVarint, pnOut); 000598 } 000599 } 000600 000601 return SQLITE_OK; 000602 } 000603 000604 /* 000605 ** Attempt to memory map file pFile. If successful, set *pp to point to the 000606 ** new mapping and return SQLITE_OK. If the mapping is not attempted 000607 ** (because the file is too large or the VFS layer is configured not to use 000608 ** mmap), return SQLITE_OK and set *pp to NULL. 000609 ** 000610 ** Or, if an error occurs, return an SQLite error code. The final value of 000611 ** *pp is undefined in this case. 000612 */ 000613 static int vdbeSorterMapFile(SortSubtask *pTask, SorterFile *pFile, u8 **pp){ 000614 int rc = SQLITE_OK; 000615 if( pFile->iEof<=(i64)(pTask->pSorter->db->nMaxSorterMmap) ){ 000616 sqlite3_file *pFd = pFile->pFd; 000617 if( pFd->pMethods->iVersion>=3 ){ 000618 rc = sqlite3OsFetch(pFd, 0, (int)pFile->iEof, (void**)pp); 000619 testcase( rc!=SQLITE_OK ); 000620 } 000621 } 000622 return rc; 000623 } 000624 000625 /* 000626 ** Attach PmaReader pReadr to file pFile (if it is not already attached to 000627 ** that file) and seek it to offset iOff within the file. Return SQLITE_OK 000628 ** if successful, or an SQLite error code if an error occurs. 000629 */ 000630 static int vdbePmaReaderSeek( 000631 SortSubtask *pTask, /* Task context */ 000632 PmaReader *pReadr, /* Reader whose cursor is to be moved */ 000633 SorterFile *pFile, /* Sorter file to read from */ 000634 i64 iOff /* Offset in pFile */ 000635 ){ 000636 int rc = SQLITE_OK; 000637 000638 assert( pReadr->pIncr==0 || pReadr->pIncr->bEof==0 ); 000639 000640 if( sqlite3FaultSim(201) ) return SQLITE_IOERR_READ; 000641 if( pReadr->aMap ){ 000642 sqlite3OsUnfetch(pReadr->pFd, 0, pReadr->aMap); 000643 pReadr->aMap = 0; 000644 } 000645 pReadr->iReadOff = iOff; 000646 pReadr->iEof = pFile->iEof; 000647 pReadr->pFd = pFile->pFd; 000648 000649 rc = vdbeSorterMapFile(pTask, pFile, &pReadr->aMap); 000650 if( rc==SQLITE_OK && pReadr->aMap==0 ){ 000651 int pgsz = pTask->pSorter->pgsz; 000652 int iBuf = pReadr->iReadOff % pgsz; 000653 if( pReadr->aBuffer==0 ){ 000654 pReadr->aBuffer = (u8*)sqlite3Malloc(pgsz); 000655 if( pReadr->aBuffer==0 ) rc = SQLITE_NOMEM_BKPT; 000656 pReadr->nBuffer = pgsz; 000657 } 000658 if( rc==SQLITE_OK && iBuf ){ 000659 int nRead = pgsz - iBuf; 000660 if( (pReadr->iReadOff + nRead) > pReadr->iEof ){ 000661 nRead = (int)(pReadr->iEof - pReadr->iReadOff); 000662 } 000663 rc = sqlite3OsRead( 000664 pReadr->pFd, &pReadr->aBuffer[iBuf], nRead, pReadr->iReadOff 000665 ); 000666 testcase( rc!=SQLITE_OK ); 000667 } 000668 } 000669 000670 return rc; 000671 } 000672 000673 /* 000674 ** Advance PmaReader pReadr to the next key in its PMA. Return SQLITE_OK if 000675 ** no error occurs, or an SQLite error code if one does. 000676 */ 000677 static int vdbePmaReaderNext(PmaReader *pReadr){ 000678 int rc = SQLITE_OK; /* Return Code */ 000679 u64 nRec = 0; /* Size of record in bytes */ 000680 000681 000682 if( pReadr->iReadOff>=pReadr->iEof ){ 000683 IncrMerger *pIncr = pReadr->pIncr; 000684 int bEof = 1; 000685 if( pIncr ){ 000686 rc = vdbeIncrSwap(pIncr); 000687 if( rc==SQLITE_OK && pIncr->bEof==0 ){ 000688 rc = vdbePmaReaderSeek( 000689 pIncr->pTask, pReadr, &pIncr->aFile[0], pIncr->iStartOff 000690 ); 000691 bEof = 0; 000692 } 000693 } 000694 000695 if( bEof ){ 000696 /* This is an EOF condition */ 000697 vdbePmaReaderClear(pReadr); 000698 testcase( rc!=SQLITE_OK ); 000699 return rc; 000700 } 000701 } 000702 000703 if( rc==SQLITE_OK ){ 000704 rc = vdbePmaReadVarint(pReadr, &nRec); 000705 } 000706 if( rc==SQLITE_OK ){ 000707 pReadr->nKey = (int)nRec; 000708 rc = vdbePmaReadBlob(pReadr, (int)nRec, &pReadr->aKey); 000709 testcase( rc!=SQLITE_OK ); 000710 } 000711 000712 return rc; 000713 } 000714 000715 /* 000716 ** Initialize PmaReader pReadr to scan through the PMA stored in file pFile 000717 ** starting at offset iStart and ending at offset iEof-1. This function 000718 ** leaves the PmaReader pointing to the first key in the PMA (or EOF if the 000719 ** PMA is empty). 000720 ** 000721 ** If the pnByte parameter is NULL, then it is assumed that the file 000722 ** contains a single PMA, and that that PMA omits the initial length varint. 000723 */ 000724 static int vdbePmaReaderInit( 000725 SortSubtask *pTask, /* Task context */ 000726 SorterFile *pFile, /* Sorter file to read from */ 000727 i64 iStart, /* Start offset in pFile */ 000728 PmaReader *pReadr, /* PmaReader to populate */ 000729 i64 *pnByte /* IN/OUT: Increment this value by PMA size */ 000730 ){ 000731 int rc; 000732 000733 assert( pFile->iEof>iStart ); 000734 assert( pReadr->aAlloc==0 && pReadr->nAlloc==0 ); 000735 assert( pReadr->aBuffer==0 ); 000736 assert( pReadr->aMap==0 ); 000737 000738 rc = vdbePmaReaderSeek(pTask, pReadr, pFile, iStart); 000739 if( rc==SQLITE_OK ){ 000740 u64 nByte = 0; /* Size of PMA in bytes */ 000741 rc = vdbePmaReadVarint(pReadr, &nByte); 000742 pReadr->iEof = pReadr->iReadOff + nByte; 000743 *pnByte += nByte; 000744 } 000745 000746 if( rc==SQLITE_OK ){ 000747 rc = vdbePmaReaderNext(pReadr); 000748 } 000749 return rc; 000750 } 000751 000752 /* 000753 ** A version of vdbeSorterCompare() that assumes that it has already been 000754 ** determined that the first field of key1 is equal to the first field of 000755 ** key2. 000756 */ 000757 static int vdbeSorterCompareTail( 000758 SortSubtask *pTask, /* Subtask context (for pKeyInfo) */ 000759 int *pbKey2Cached, /* True if pTask->pUnpacked is pKey2 */ 000760 const void *pKey1, int nKey1, /* Left side of comparison */ 000761 const void *pKey2, int nKey2 /* Right side of comparison */ 000762 ){ 000763 UnpackedRecord *r2 = pTask->pUnpacked; 000764 if( *pbKey2Cached==0 ){ 000765 sqlite3VdbeRecordUnpack(pTask->pSorter->pKeyInfo, nKey2, pKey2, r2); 000766 *pbKey2Cached = 1; 000767 } 000768 return sqlite3VdbeRecordCompareWithSkip(nKey1, pKey1, r2, 1); 000769 } 000770 000771 /* 000772 ** Compare key1 (buffer pKey1, size nKey1 bytes) with key2 (buffer pKey2, 000773 ** size nKey2 bytes). Use (pTask->pKeyInfo) for the collation sequences 000774 ** used by the comparison. Return the result of the comparison. 000775 ** 000776 ** If IN/OUT parameter *pbKey2Cached is true when this function is called, 000777 ** it is assumed that (pTask->pUnpacked) contains the unpacked version 000778 ** of key2. If it is false, (pTask->pUnpacked) is populated with the unpacked 000779 ** version of key2 and *pbKey2Cached set to true before returning. 000780 ** 000781 ** If an OOM error is encountered, (pTask->pUnpacked->error_rc) is set 000782 ** to SQLITE_NOMEM. 000783 */ 000784 static int vdbeSorterCompare( 000785 SortSubtask *pTask, /* Subtask context (for pKeyInfo) */ 000786 int *pbKey2Cached, /* True if pTask->pUnpacked is pKey2 */ 000787 const void *pKey1, int nKey1, /* Left side of comparison */ 000788 const void *pKey2, int nKey2 /* Right side of comparison */ 000789 ){ 000790 UnpackedRecord *r2 = pTask->pUnpacked; 000791 if( !*pbKey2Cached ){ 000792 sqlite3VdbeRecordUnpack(pTask->pSorter->pKeyInfo, nKey2, pKey2, r2); 000793 *pbKey2Cached = 1; 000794 } 000795 return sqlite3VdbeRecordCompare(nKey1, pKey1, r2); 000796 } 000797 000798 /* 000799 ** A specially optimized version of vdbeSorterCompare() that assumes that 000800 ** the first field of each key is a TEXT value and that the collation 000801 ** sequence to compare them with is BINARY. 000802 */ 000803 static int vdbeSorterCompareText( 000804 SortSubtask *pTask, /* Subtask context (for pKeyInfo) */ 000805 int *pbKey2Cached, /* True if pTask->pUnpacked is pKey2 */ 000806 const void *pKey1, int nKey1, /* Left side of comparison */ 000807 const void *pKey2, int nKey2 /* Right side of comparison */ 000808 ){ 000809 const u8 * const p1 = (const u8 * const)pKey1; 000810 const u8 * const p2 = (const u8 * const)pKey2; 000811 const u8 * const v1 = &p1[ p1[0] ]; /* Pointer to value 1 */ 000812 const u8 * const v2 = &p2[ p2[0] ]; /* Pointer to value 2 */ 000813 000814 int n1; 000815 int n2; 000816 int res; 000817 000818 getVarint32(&p1[1], n1); n1 = (n1 - 13) / 2; 000819 getVarint32(&p2[1], n2); n2 = (n2 - 13) / 2; 000820 res = memcmp(v1, v2, MIN(n1, n2)); 000821 if( res==0 ){ 000822 res = n1 - n2; 000823 } 000824 000825 if( res==0 ){ 000826 if( pTask->pSorter->pKeyInfo->nField>1 ){ 000827 res = vdbeSorterCompareTail( 000828 pTask, pbKey2Cached, pKey1, nKey1, pKey2, nKey2 000829 ); 000830 } 000831 }else{ 000832 if( pTask->pSorter->pKeyInfo->aSortOrder[0] ){ 000833 res = res * -1; 000834 } 000835 } 000836 000837 return res; 000838 } 000839 000840 /* 000841 ** A specially optimized version of vdbeSorterCompare() that assumes that 000842 ** the first field of each key is an INTEGER value. 000843 */ 000844 static int vdbeSorterCompareInt( 000845 SortSubtask *pTask, /* Subtask context (for pKeyInfo) */ 000846 int *pbKey2Cached, /* True if pTask->pUnpacked is pKey2 */ 000847 const void *pKey1, int nKey1, /* Left side of comparison */ 000848 const void *pKey2, int nKey2 /* Right side of comparison */ 000849 ){ 000850 const u8 * const p1 = (const u8 * const)pKey1; 000851 const u8 * const p2 = (const u8 * const)pKey2; 000852 const int s1 = p1[1]; /* Left hand serial type */ 000853 const int s2 = p2[1]; /* Right hand serial type */ 000854 const u8 * const v1 = &p1[ p1[0] ]; /* Pointer to value 1 */ 000855 const u8 * const v2 = &p2[ p2[0] ]; /* Pointer to value 2 */ 000856 int res; /* Return value */ 000857 000858 assert( (s1>0 && s1<7) || s1==8 || s1==9 ); 000859 assert( (s2>0 && s2<7) || s2==8 || s2==9 ); 000860 000861 if( s1>7 && s2>7 ){ 000862 res = s1 - s2; 000863 }else{ 000864 if( s1==s2 ){ 000865 if( (*v1 ^ *v2) & 0x80 ){ 000866 /* The two values have different signs */ 000867 res = (*v1 & 0x80) ? -1 : +1; 000868 }else{ 000869 /* The two values have the same sign. Compare using memcmp(). */ 000870 static const u8 aLen[] = {0, 1, 2, 3, 4, 6, 8 }; 000871 int i; 000872 res = 0; 000873 for(i=0; i<aLen[s1]; i++){ 000874 if( (res = v1[i] - v2[i]) ) break; 000875 } 000876 } 000877 }else{ 000878 if( s2>7 ){ 000879 res = +1; 000880 }else if( s1>7 ){ 000881 res = -1; 000882 }else{ 000883 res = s1 - s2; 000884 } 000885 assert( res!=0 ); 000886 000887 if( res>0 ){ 000888 if( *v1 & 0x80 ) res = -1; 000889 }else{ 000890 if( *v2 & 0x80 ) res = +1; 000891 } 000892 } 000893 } 000894 000895 if( res==0 ){ 000896 if( pTask->pSorter->pKeyInfo->nField>1 ){ 000897 res = vdbeSorterCompareTail( 000898 pTask, pbKey2Cached, pKey1, nKey1, pKey2, nKey2 000899 ); 000900 } 000901 }else if( pTask->pSorter->pKeyInfo->aSortOrder[0] ){ 000902 res = res * -1; 000903 } 000904 000905 return res; 000906 } 000907 000908 /* 000909 ** Initialize the temporary index cursor just opened as a sorter cursor. 000910 ** 000911 ** Usually, the sorter module uses the value of (pCsr->pKeyInfo->nField) 000912 ** to determine the number of fields that should be compared from the 000913 ** records being sorted. However, if the value passed as argument nField 000914 ** is non-zero and the sorter is able to guarantee a stable sort, nField 000915 ** is used instead. This is used when sorting records for a CREATE INDEX 000916 ** statement. In this case, keys are always delivered to the sorter in 000917 ** order of the primary key, which happens to be make up the final part 000918 ** of the records being sorted. So if the sort is stable, there is never 000919 ** any reason to compare PK fields and they can be ignored for a small 000920 ** performance boost. 000921 ** 000922 ** The sorter can guarantee a stable sort when running in single-threaded 000923 ** mode, but not in multi-threaded mode. 000924 ** 000925 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise. 000926 */ 000927 int sqlite3VdbeSorterInit( 000928 sqlite3 *db, /* Database connection (for malloc()) */ 000929 int nField, /* Number of key fields in each record */ 000930 VdbeCursor *pCsr /* Cursor that holds the new sorter */ 000931 ){ 000932 int pgsz; /* Page size of main database */ 000933 int i; /* Used to iterate through aTask[] */ 000934 VdbeSorter *pSorter; /* The new sorter */ 000935 KeyInfo *pKeyInfo; /* Copy of pCsr->pKeyInfo with db==0 */ 000936 int szKeyInfo; /* Size of pCsr->pKeyInfo in bytes */ 000937 int sz; /* Size of pSorter in bytes */ 000938 int rc = SQLITE_OK; 000939 #if SQLITE_MAX_WORKER_THREADS==0 000940 # define nWorker 0 000941 #else 000942 int nWorker; 000943 #endif 000944 000945 /* Initialize the upper limit on the number of worker threads */ 000946 #if SQLITE_MAX_WORKER_THREADS>0 000947 if( sqlite3TempInMemory(db) || sqlite3GlobalConfig.bCoreMutex==0 ){ 000948 nWorker = 0; 000949 }else{ 000950 nWorker = db->aLimit[SQLITE_LIMIT_WORKER_THREADS]; 000951 } 000952 #endif 000953 000954 /* Do not allow the total number of threads (main thread + all workers) 000955 ** to exceed the maximum merge count */ 000956 #if SQLITE_MAX_WORKER_THREADS>=SORTER_MAX_MERGE_COUNT 000957 if( nWorker>=SORTER_MAX_MERGE_COUNT ){ 000958 nWorker = SORTER_MAX_MERGE_COUNT-1; 000959 } 000960 #endif 000961 000962 assert( pCsr->pKeyInfo && pCsr->pBtx==0 ); 000963 assert( pCsr->eCurType==CURTYPE_SORTER ); 000964 szKeyInfo = sizeof(KeyInfo) + (pCsr->pKeyInfo->nField-1)*sizeof(CollSeq*); 000965 sz = sizeof(VdbeSorter) + nWorker * sizeof(SortSubtask); 000966 000967 pSorter = (VdbeSorter*)sqlite3DbMallocZero(db, sz + szKeyInfo); 000968 pCsr->uc.pSorter = pSorter; 000969 if( pSorter==0 ){ 000970 rc = SQLITE_NOMEM_BKPT; 000971 }else{ 000972 pSorter->pKeyInfo = pKeyInfo = (KeyInfo*)((u8*)pSorter + sz); 000973 memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo); 000974 pKeyInfo->db = 0; 000975 if( nField && nWorker==0 ){ 000976 pKeyInfo->nXField += (pKeyInfo->nField - nField); 000977 pKeyInfo->nField = nField; 000978 } 000979 pSorter->pgsz = pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt); 000980 pSorter->nTask = nWorker + 1; 000981 pSorter->iPrev = (u8)(nWorker - 1); 000982 pSorter->bUseThreads = (pSorter->nTask>1); 000983 pSorter->db = db; 000984 for(i=0; i<pSorter->nTask; i++){ 000985 SortSubtask *pTask = &pSorter->aTask[i]; 000986 pTask->pSorter = pSorter; 000987 } 000988 000989 if( !sqlite3TempInMemory(db) ){ 000990 i64 mxCache; /* Cache size in bytes*/ 000991 u32 szPma = sqlite3GlobalConfig.szPma; 000992 pSorter->mnPmaSize = szPma * pgsz; 000993 000994 mxCache = db->aDb[0].pSchema->cache_size; 000995 if( mxCache<0 ){ 000996 /* A negative cache-size value C indicates that the cache is abs(C) 000997 ** KiB in size. */ 000998 mxCache = mxCache * -1024; 000999 }else{ 001000 mxCache = mxCache * pgsz; 001001 } 001002 mxCache = MIN(mxCache, SQLITE_MAX_PMASZ); 001003 pSorter->mxPmaSize = MAX(pSorter->mnPmaSize, (int)mxCache); 001004 001005 /* EVIDENCE-OF: R-26747-61719 When the application provides any amount of 001006 ** scratch memory using SQLITE_CONFIG_SCRATCH, SQLite avoids unnecessary 001007 ** large heap allocations. 001008 */ 001009 if( sqlite3GlobalConfig.pScratch==0 ){ 001010 assert( pSorter->iMemory==0 ); 001011 pSorter->nMemory = pgsz; 001012 pSorter->list.aMemory = (u8*)sqlite3Malloc(pgsz); 001013 if( !pSorter->list.aMemory ) rc = SQLITE_NOMEM_BKPT; 001014 } 001015 } 001016 001017 if( (pKeyInfo->nField+pKeyInfo->nXField)<13 001018 && (pKeyInfo->aColl[0]==0 || pKeyInfo->aColl[0]==db->pDfltColl) 001019 ){ 001020 pSorter->typeMask = SORTER_TYPE_INTEGER | SORTER_TYPE_TEXT; 001021 } 001022 } 001023 001024 return rc; 001025 } 001026 #undef nWorker /* Defined at the top of this function */ 001027 001028 /* 001029 ** Free the list of sorted records starting at pRecord. 001030 */ 001031 static void vdbeSorterRecordFree(sqlite3 *db, SorterRecord *pRecord){ 001032 SorterRecord *p; 001033 SorterRecord *pNext; 001034 for(p=pRecord; p; p=pNext){ 001035 pNext = p->u.pNext; 001036 sqlite3DbFree(db, p); 001037 } 001038 } 001039 001040 /* 001041 ** Free all resources owned by the object indicated by argument pTask. All 001042 ** fields of *pTask are zeroed before returning. 001043 */ 001044 static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pTask){ 001045 sqlite3DbFree(db, pTask->pUnpacked); 001046 #if SQLITE_MAX_WORKER_THREADS>0 001047 /* pTask->list.aMemory can only be non-zero if it was handed memory 001048 ** from the main thread. That only occurs SQLITE_MAX_WORKER_THREADS>0 */ 001049 if( pTask->list.aMemory ){ 001050 sqlite3_free(pTask->list.aMemory); 001051 }else 001052 #endif 001053 { 001054 assert( pTask->list.aMemory==0 ); 001055 vdbeSorterRecordFree(0, pTask->list.pList); 001056 } 001057 if( pTask->file.pFd ){ 001058 sqlite3OsCloseFree(pTask->file.pFd); 001059 } 001060 if( pTask->file2.pFd ){ 001061 sqlite3OsCloseFree(pTask->file2.pFd); 001062 } 001063 memset(pTask, 0, sizeof(SortSubtask)); 001064 } 001065 001066 #ifdef SQLITE_DEBUG_SORTER_THREADS 001067 static void vdbeSorterWorkDebug(SortSubtask *pTask, const char *zEvent){ 001068 i64 t; 001069 int iTask = (pTask - pTask->pSorter->aTask); 001070 sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t); 001071 fprintf(stderr, "%lld:%d %s\n", t, iTask, zEvent); 001072 } 001073 static void vdbeSorterRewindDebug(const char *zEvent){ 001074 i64 t; 001075 sqlite3OsCurrentTimeInt64(sqlite3_vfs_find(0), &t); 001076 fprintf(stderr, "%lld:X %s\n", t, zEvent); 001077 } 001078 static void vdbeSorterPopulateDebug( 001079 SortSubtask *pTask, 001080 const char *zEvent 001081 ){ 001082 i64 t; 001083 int iTask = (pTask - pTask->pSorter->aTask); 001084 sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t); 001085 fprintf(stderr, "%lld:bg%d %s\n", t, iTask, zEvent); 001086 } 001087 static void vdbeSorterBlockDebug( 001088 SortSubtask *pTask, 001089 int bBlocked, 001090 const char *zEvent 001091 ){ 001092 if( bBlocked ){ 001093 i64 t; 001094 sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t); 001095 fprintf(stderr, "%lld:main %s\n", t, zEvent); 001096 } 001097 } 001098 #else 001099 # define vdbeSorterWorkDebug(x,y) 001100 # define vdbeSorterRewindDebug(y) 001101 # define vdbeSorterPopulateDebug(x,y) 001102 # define vdbeSorterBlockDebug(x,y,z) 001103 #endif 001104 001105 #if SQLITE_MAX_WORKER_THREADS>0 001106 /* 001107 ** Join thread pTask->thread. 001108 */ 001109 static int vdbeSorterJoinThread(SortSubtask *pTask){ 001110 int rc = SQLITE_OK; 001111 if( pTask->pThread ){ 001112 #ifdef SQLITE_DEBUG_SORTER_THREADS 001113 int bDone = pTask->bDone; 001114 #endif 001115 void *pRet = SQLITE_INT_TO_PTR(SQLITE_ERROR); 001116 vdbeSorterBlockDebug(pTask, !bDone, "enter"); 001117 (void)sqlite3ThreadJoin(pTask->pThread, &pRet); 001118 vdbeSorterBlockDebug(pTask, !bDone, "exit"); 001119 rc = SQLITE_PTR_TO_INT(pRet); 001120 assert( pTask->bDone==1 ); 001121 pTask->bDone = 0; 001122 pTask->pThread = 0; 001123 } 001124 return rc; 001125 } 001126 001127 /* 001128 ** Launch a background thread to run xTask(pIn). 001129 */ 001130 static int vdbeSorterCreateThread( 001131 SortSubtask *pTask, /* Thread will use this task object */ 001132 void *(*xTask)(void*), /* Routine to run in a separate thread */ 001133 void *pIn /* Argument passed into xTask() */ 001134 ){ 001135 assert( pTask->pThread==0 && pTask->bDone==0 ); 001136 return sqlite3ThreadCreate(&pTask->pThread, xTask, pIn); 001137 } 001138 001139 /* 001140 ** Join all outstanding threads launched by SorterWrite() to create 001141 ** level-0 PMAs. 001142 */ 001143 static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){ 001144 int rc = rcin; 001145 int i; 001146 001147 /* This function is always called by the main user thread. 001148 ** 001149 ** If this function is being called after SorterRewind() has been called, 001150 ** it is possible that thread pSorter->aTask[pSorter->nTask-1].pThread 001151 ** is currently attempt to join one of the other threads. To avoid a race 001152 ** condition where this thread also attempts to join the same object, join 001153 ** thread pSorter->aTask[pSorter->nTask-1].pThread first. */ 001154 for(i=pSorter->nTask-1; i>=0; i--){ 001155 SortSubtask *pTask = &pSorter->aTask[i]; 001156 int rc2 = vdbeSorterJoinThread(pTask); 001157 if( rc==SQLITE_OK ) rc = rc2; 001158 } 001159 return rc; 001160 } 001161 #else 001162 # define vdbeSorterJoinAll(x,rcin) (rcin) 001163 # define vdbeSorterJoinThread(pTask) SQLITE_OK 001164 #endif 001165 001166 /* 001167 ** Allocate a new MergeEngine object capable of handling up to 001168 ** nReader PmaReader inputs. 001169 ** 001170 ** nReader is automatically rounded up to the next power of two. 001171 ** nReader may not exceed SORTER_MAX_MERGE_COUNT even after rounding up. 001172 */ 001173 static MergeEngine *vdbeMergeEngineNew(int nReader){ 001174 int N = 2; /* Smallest power of two >= nReader */ 001175 int nByte; /* Total bytes of space to allocate */ 001176 MergeEngine *pNew; /* Pointer to allocated object to return */ 001177 001178 assert( nReader<=SORTER_MAX_MERGE_COUNT ); 001179 001180 while( N<nReader ) N += N; 001181 nByte = sizeof(MergeEngine) + N * (sizeof(int) + sizeof(PmaReader)); 001182 001183 pNew = sqlite3FaultSim(100) ? 0 : (MergeEngine*)sqlite3MallocZero(nByte); 001184 if( pNew ){ 001185 pNew->nTree = N; 001186 pNew->pTask = 0; 001187 pNew->aReadr = (PmaReader*)&pNew[1]; 001188 pNew->aTree = (int*)&pNew->aReadr[N]; 001189 } 001190 return pNew; 001191 } 001192 001193 /* 001194 ** Free the MergeEngine object passed as the only argument. 001195 */ 001196 static void vdbeMergeEngineFree(MergeEngine *pMerger){ 001197 int i; 001198 if( pMerger ){ 001199 for(i=0; i<pMerger->nTree; i++){ 001200 vdbePmaReaderClear(&pMerger->aReadr[i]); 001201 } 001202 } 001203 sqlite3_free(pMerger); 001204 } 001205 001206 /* 001207 ** Free all resources associated with the IncrMerger object indicated by 001208 ** the first argument. 001209 */ 001210 static void vdbeIncrFree(IncrMerger *pIncr){ 001211 if( pIncr ){ 001212 #if SQLITE_MAX_WORKER_THREADS>0 001213 if( pIncr->bUseThread ){ 001214 vdbeSorterJoinThread(pIncr->pTask); 001215 if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd); 001216 if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd); 001217 } 001218 #endif 001219 vdbeMergeEngineFree(pIncr->pMerger); 001220 sqlite3_free(pIncr); 001221 } 001222 } 001223 001224 /* 001225 ** Reset a sorting cursor back to its original empty state. 001226 */ 001227 void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){ 001228 int i; 001229 (void)vdbeSorterJoinAll(pSorter, SQLITE_OK); 001230 assert( pSorter->bUseThreads || pSorter->pReader==0 ); 001231 #if SQLITE_MAX_WORKER_THREADS>0 001232 if( pSorter->pReader ){ 001233 vdbePmaReaderClear(pSorter->pReader); 001234 sqlite3DbFree(db, pSorter->pReader); 001235 pSorter->pReader = 0; 001236 } 001237 #endif 001238 vdbeMergeEngineFree(pSorter->pMerger); 001239 pSorter->pMerger = 0; 001240 for(i=0; i<pSorter->nTask; i++){ 001241 SortSubtask *pTask = &pSorter->aTask[i]; 001242 vdbeSortSubtaskCleanup(db, pTask); 001243 pTask->pSorter = pSorter; 001244 } 001245 if( pSorter->list.aMemory==0 ){ 001246 vdbeSorterRecordFree(0, pSorter->list.pList); 001247 } 001248 pSorter->list.pList = 0; 001249 pSorter->list.szPMA = 0; 001250 pSorter->bUsePMA = 0; 001251 pSorter->iMemory = 0; 001252 pSorter->mxKeysize = 0; 001253 sqlite3DbFree(db, pSorter->pUnpacked); 001254 pSorter->pUnpacked = 0; 001255 } 001256 001257 /* 001258 ** Free any cursor components allocated by sqlite3VdbeSorterXXX routines. 001259 */ 001260 void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){ 001261 VdbeSorter *pSorter; 001262 assert( pCsr->eCurType==CURTYPE_SORTER ); 001263 pSorter = pCsr->uc.pSorter; 001264 if( pSorter ){ 001265 sqlite3VdbeSorterReset(db, pSorter); 001266 sqlite3_free(pSorter->list.aMemory); 001267 sqlite3DbFree(db, pSorter); 001268 pCsr->uc.pSorter = 0; 001269 } 001270 } 001271 001272 #if SQLITE_MAX_MMAP_SIZE>0 001273 /* 001274 ** The first argument is a file-handle open on a temporary file. The file 001275 ** is guaranteed to be nByte bytes or smaller in size. This function 001276 ** attempts to extend the file to nByte bytes in size and to ensure that 001277 ** the VFS has memory mapped it. 001278 ** 001279 ** Whether or not the file does end up memory mapped of course depends on 001280 ** the specific VFS implementation. 001281 */ 001282 static void vdbeSorterExtendFile(sqlite3 *db, sqlite3_file *pFd, i64 nByte){ 001283 if( nByte<=(i64)(db->nMaxSorterMmap) && pFd->pMethods->iVersion>=3 ){ 001284 void *p = 0; 001285 int chunksize = 4*1024; 001286 sqlite3OsFileControlHint(pFd, SQLITE_FCNTL_CHUNK_SIZE, &chunksize); 001287 sqlite3OsFileControlHint(pFd, SQLITE_FCNTL_SIZE_HINT, &nByte); 001288 sqlite3OsFetch(pFd, 0, (int)nByte, &p); 001289 sqlite3OsUnfetch(pFd, 0, p); 001290 } 001291 } 001292 #else 001293 # define vdbeSorterExtendFile(x,y,z) 001294 #endif 001295 001296 /* 001297 ** Allocate space for a file-handle and open a temporary file. If successful, 001298 ** set *ppFd to point to the malloc'd file-handle and return SQLITE_OK. 001299 ** Otherwise, set *ppFd to 0 and return an SQLite error code. 001300 */ 001301 static int vdbeSorterOpenTempFile( 001302 sqlite3 *db, /* Database handle doing sort */ 001303 i64 nExtend, /* Attempt to extend file to this size */ 001304 sqlite3_file **ppFd 001305 ){ 001306 int rc; 001307 if( sqlite3FaultSim(202) ) return SQLITE_IOERR_ACCESS; 001308 rc = sqlite3OsOpenMalloc(db->pVfs, 0, ppFd, 001309 SQLITE_OPEN_TEMP_JOURNAL | 001310 SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | 001311 SQLITE_OPEN_EXCLUSIVE | SQLITE_OPEN_DELETEONCLOSE, &rc 001312 ); 001313 if( rc==SQLITE_OK ){ 001314 i64 max = SQLITE_MAX_MMAP_SIZE; 001315 sqlite3OsFileControlHint(*ppFd, SQLITE_FCNTL_MMAP_SIZE, (void*)&max); 001316 if( nExtend>0 ){ 001317 vdbeSorterExtendFile(db, *ppFd, nExtend); 001318 } 001319 } 001320 return rc; 001321 } 001322 001323 /* 001324 ** If it has not already been allocated, allocate the UnpackedRecord 001325 ** structure at pTask->pUnpacked. Return SQLITE_OK if successful (or 001326 ** if no allocation was required), or SQLITE_NOMEM otherwise. 001327 */ 001328 static int vdbeSortAllocUnpacked(SortSubtask *pTask){ 001329 if( pTask->pUnpacked==0 ){ 001330 pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pTask->pSorter->pKeyInfo); 001331 if( pTask->pUnpacked==0 ) return SQLITE_NOMEM_BKPT; 001332 pTask->pUnpacked->nField = pTask->pSorter->pKeyInfo->nField; 001333 pTask->pUnpacked->errCode = 0; 001334 } 001335 return SQLITE_OK; 001336 } 001337 001338 001339 /* 001340 ** Merge the two sorted lists p1 and p2 into a single list. 001341 */ 001342 static SorterRecord *vdbeSorterMerge( 001343 SortSubtask *pTask, /* Calling thread context */ 001344 SorterRecord *p1, /* First list to merge */ 001345 SorterRecord *p2 /* Second list to merge */ 001346 ){ 001347 SorterRecord *pFinal = 0; 001348 SorterRecord **pp = &pFinal; 001349 int bCached = 0; 001350 001351 assert( p1!=0 && p2!=0 ); 001352 for(;;){ 001353 int res; 001354 res = pTask->xCompare( 001355 pTask, &bCached, SRVAL(p1), p1->nVal, SRVAL(p2), p2->nVal 001356 ); 001357 001358 if( res<=0 ){ 001359 *pp = p1; 001360 pp = &p1->u.pNext; 001361 p1 = p1->u.pNext; 001362 if( p1==0 ){ 001363 *pp = p2; 001364 break; 001365 } 001366 }else{ 001367 *pp = p2; 001368 pp = &p2->u.pNext; 001369 p2 = p2->u.pNext; 001370 bCached = 0; 001371 if( p2==0 ){ 001372 *pp = p1; 001373 break; 001374 } 001375 } 001376 } 001377 return pFinal; 001378 } 001379 001380 /* 001381 ** Return the SorterCompare function to compare values collected by the 001382 ** sorter object passed as the only argument. 001383 */ 001384 static SorterCompare vdbeSorterGetCompare(VdbeSorter *p){ 001385 if( p->typeMask==SORTER_TYPE_INTEGER ){ 001386 return vdbeSorterCompareInt; 001387 }else if( p->typeMask==SORTER_TYPE_TEXT ){ 001388 return vdbeSorterCompareText; 001389 } 001390 return vdbeSorterCompare; 001391 } 001392 001393 /* 001394 ** Sort the linked list of records headed at pTask->pList. Return 001395 ** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if 001396 ** an error occurs. 001397 */ 001398 static int vdbeSorterSort(SortSubtask *pTask, SorterList *pList){ 001399 int i; 001400 SorterRecord **aSlot; 001401 SorterRecord *p; 001402 int rc; 001403 001404 rc = vdbeSortAllocUnpacked(pTask); 001405 if( rc!=SQLITE_OK ) return rc; 001406 001407 p = pList->pList; 001408 pTask->xCompare = vdbeSorterGetCompare(pTask->pSorter); 001409 001410 aSlot = (SorterRecord **)sqlite3MallocZero(64 * sizeof(SorterRecord *)); 001411 if( !aSlot ){ 001412 return SQLITE_NOMEM_BKPT; 001413 } 001414 001415 while( p ){ 001416 SorterRecord *pNext; 001417 if( pList->aMemory ){ 001418 if( (u8*)p==pList->aMemory ){ 001419 pNext = 0; 001420 }else{ 001421 assert( p->u.iNext<sqlite3MallocSize(pList->aMemory) ); 001422 pNext = (SorterRecord*)&pList->aMemory[p->u.iNext]; 001423 } 001424 }else{ 001425 pNext = p->u.pNext; 001426 } 001427 001428 p->u.pNext = 0; 001429 for(i=0; aSlot[i]; i++){ 001430 p = vdbeSorterMerge(pTask, p, aSlot[i]); 001431 aSlot[i] = 0; 001432 } 001433 aSlot[i] = p; 001434 p = pNext; 001435 } 001436 001437 p = 0; 001438 for(i=0; i<64; i++){ 001439 if( aSlot[i]==0 ) continue; 001440 p = p ? vdbeSorterMerge(pTask, p, aSlot[i]) : aSlot[i]; 001441 } 001442 pList->pList = p; 001443 001444 sqlite3_free(aSlot); 001445 assert( pTask->pUnpacked->errCode==SQLITE_OK 001446 || pTask->pUnpacked->errCode==SQLITE_NOMEM 001447 ); 001448 return pTask->pUnpacked->errCode; 001449 } 001450 001451 /* 001452 ** Initialize a PMA-writer object. 001453 */ 001454 static void vdbePmaWriterInit( 001455 sqlite3_file *pFd, /* File handle to write to */ 001456 PmaWriter *p, /* Object to populate */ 001457 int nBuf, /* Buffer size */ 001458 i64 iStart /* Offset of pFd to begin writing at */ 001459 ){ 001460 memset(p, 0, sizeof(PmaWriter)); 001461 p->aBuffer = (u8*)sqlite3Malloc(nBuf); 001462 if( !p->aBuffer ){ 001463 p->eFWErr = SQLITE_NOMEM_BKPT; 001464 }else{ 001465 p->iBufEnd = p->iBufStart = (iStart % nBuf); 001466 p->iWriteOff = iStart - p->iBufStart; 001467 p->nBuffer = nBuf; 001468 p->pFd = pFd; 001469 } 001470 } 001471 001472 /* 001473 ** Write nData bytes of data to the PMA. Return SQLITE_OK 001474 ** if successful, or an SQLite error code if an error occurs. 001475 */ 001476 static void vdbePmaWriteBlob(PmaWriter *p, u8 *pData, int nData){ 001477 int nRem = nData; 001478 while( nRem>0 && p->eFWErr==0 ){ 001479 int nCopy = nRem; 001480 if( nCopy>(p->nBuffer - p->iBufEnd) ){ 001481 nCopy = p->nBuffer - p->iBufEnd; 001482 } 001483 001484 memcpy(&p->aBuffer[p->iBufEnd], &pData[nData-nRem], nCopy); 001485 p->iBufEnd += nCopy; 001486 if( p->iBufEnd==p->nBuffer ){ 001487 p->eFWErr = sqlite3OsWrite(p->pFd, 001488 &p->aBuffer[p->iBufStart], p->iBufEnd - p->iBufStart, 001489 p->iWriteOff + p->iBufStart 001490 ); 001491 p->iBufStart = p->iBufEnd = 0; 001492 p->iWriteOff += p->nBuffer; 001493 } 001494 assert( p->iBufEnd<p->nBuffer ); 001495 001496 nRem -= nCopy; 001497 } 001498 } 001499 001500 /* 001501 ** Flush any buffered data to disk and clean up the PMA-writer object. 001502 ** The results of using the PMA-writer after this call are undefined. 001503 ** Return SQLITE_OK if flushing the buffered data succeeds or is not 001504 ** required. Otherwise, return an SQLite error code. 001505 ** 001506 ** Before returning, set *piEof to the offset immediately following the 001507 ** last byte written to the file. 001508 */ 001509 static int vdbePmaWriterFinish(PmaWriter *p, i64 *piEof){ 001510 int rc; 001511 if( p->eFWErr==0 && ALWAYS(p->aBuffer) && p->iBufEnd>p->iBufStart ){ 001512 p->eFWErr = sqlite3OsWrite(p->pFd, 001513 &p->aBuffer[p->iBufStart], p->iBufEnd - p->iBufStart, 001514 p->iWriteOff + p->iBufStart 001515 ); 001516 } 001517 *piEof = (p->iWriteOff + p->iBufEnd); 001518 sqlite3_free(p->aBuffer); 001519 rc = p->eFWErr; 001520 memset(p, 0, sizeof(PmaWriter)); 001521 return rc; 001522 } 001523 001524 /* 001525 ** Write value iVal encoded as a varint to the PMA. Return 001526 ** SQLITE_OK if successful, or an SQLite error code if an error occurs. 001527 */ 001528 static void vdbePmaWriteVarint(PmaWriter *p, u64 iVal){ 001529 int nByte; 001530 u8 aByte[10]; 001531 nByte = sqlite3PutVarint(aByte, iVal); 001532 vdbePmaWriteBlob(p, aByte, nByte); 001533 } 001534 001535 /* 001536 ** Write the current contents of in-memory linked-list pList to a level-0 001537 ** PMA in the temp file belonging to sub-task pTask. Return SQLITE_OK if 001538 ** successful, or an SQLite error code otherwise. 001539 ** 001540 ** The format of a PMA is: 001541 ** 001542 ** * A varint. This varint contains the total number of bytes of content 001543 ** in the PMA (not including the varint itself). 001544 ** 001545 ** * One or more records packed end-to-end in order of ascending keys. 001546 ** Each record consists of a varint followed by a blob of data (the 001547 ** key). The varint is the number of bytes in the blob of data. 001548 */ 001549 static int vdbeSorterListToPMA(SortSubtask *pTask, SorterList *pList){ 001550 sqlite3 *db = pTask->pSorter->db; 001551 int rc = SQLITE_OK; /* Return code */ 001552 PmaWriter writer; /* Object used to write to the file */ 001553 001554 #ifdef SQLITE_DEBUG 001555 /* Set iSz to the expected size of file pTask->file after writing the PMA. 001556 ** This is used by an assert() statement at the end of this function. */ 001557 i64 iSz = pList->szPMA + sqlite3VarintLen(pList->szPMA) + pTask->file.iEof; 001558 #endif 001559 001560 vdbeSorterWorkDebug(pTask, "enter"); 001561 memset(&writer, 0, sizeof(PmaWriter)); 001562 assert( pList->szPMA>0 ); 001563 001564 /* If the first temporary PMA file has not been opened, open it now. */ 001565 if( pTask->file.pFd==0 ){ 001566 rc = vdbeSorterOpenTempFile(db, 0, &pTask->file.pFd); 001567 assert( rc!=SQLITE_OK || pTask->file.pFd ); 001568 assert( pTask->file.iEof==0 ); 001569 assert( pTask->nPMA==0 ); 001570 } 001571 001572 /* Try to get the file to memory map */ 001573 if( rc==SQLITE_OK ){ 001574 vdbeSorterExtendFile(db, pTask->file.pFd, pTask->file.iEof+pList->szPMA+9); 001575 } 001576 001577 /* Sort the list */ 001578 if( rc==SQLITE_OK ){ 001579 rc = vdbeSorterSort(pTask, pList); 001580 } 001581 001582 if( rc==SQLITE_OK ){ 001583 SorterRecord *p; 001584 SorterRecord *pNext = 0; 001585 001586 vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pSorter->pgsz, 001587 pTask->file.iEof); 001588 pTask->nPMA++; 001589 vdbePmaWriteVarint(&writer, pList->szPMA); 001590 for(p=pList->pList; p; p=pNext){ 001591 pNext = p->u.pNext; 001592 vdbePmaWriteVarint(&writer, p->nVal); 001593 vdbePmaWriteBlob(&writer, SRVAL(p), p->nVal); 001594 if( pList->aMemory==0 ) sqlite3_free(p); 001595 } 001596 pList->pList = p; 001597 rc = vdbePmaWriterFinish(&writer, &pTask->file.iEof); 001598 } 001599 001600 vdbeSorterWorkDebug(pTask, "exit"); 001601 assert( rc!=SQLITE_OK || pList->pList==0 ); 001602 assert( rc!=SQLITE_OK || pTask->file.iEof==iSz ); 001603 return rc; 001604 } 001605 001606 /* 001607 ** Advance the MergeEngine to its next entry. 001608 ** Set *pbEof to true there is no next entry because 001609 ** the MergeEngine has reached the end of all its inputs. 001610 ** 001611 ** Return SQLITE_OK if successful or an error code if an error occurs. 001612 */ 001613 static int vdbeMergeEngineStep( 001614 MergeEngine *pMerger, /* The merge engine to advance to the next row */ 001615 int *pbEof /* Set TRUE at EOF. Set false for more content */ 001616 ){ 001617 int rc; 001618 int iPrev = pMerger->aTree[1];/* Index of PmaReader to advance */ 001619 SortSubtask *pTask = pMerger->pTask; 001620 001621 /* Advance the current PmaReader */ 001622 rc = vdbePmaReaderNext(&pMerger->aReadr[iPrev]); 001623 001624 /* Update contents of aTree[] */ 001625 if( rc==SQLITE_OK ){ 001626 int i; /* Index of aTree[] to recalculate */ 001627 PmaReader *pReadr1; /* First PmaReader to compare */ 001628 PmaReader *pReadr2; /* Second PmaReader to compare */ 001629 int bCached = 0; 001630 001631 /* Find the first two PmaReaders to compare. The one that was just 001632 ** advanced (iPrev) and the one next to it in the array. */ 001633 pReadr1 = &pMerger->aReadr[(iPrev & 0xFFFE)]; 001634 pReadr2 = &pMerger->aReadr[(iPrev | 0x0001)]; 001635 001636 for(i=(pMerger->nTree+iPrev)/2; i>0; i=i/2){ 001637 /* Compare pReadr1 and pReadr2. Store the result in variable iRes. */ 001638 int iRes; 001639 if( pReadr1->pFd==0 ){ 001640 iRes = +1; 001641 }else if( pReadr2->pFd==0 ){ 001642 iRes = -1; 001643 }else{ 001644 iRes = pTask->xCompare(pTask, &bCached, 001645 pReadr1->aKey, pReadr1->nKey, pReadr2->aKey, pReadr2->nKey 001646 ); 001647 } 001648 001649 /* If pReadr1 contained the smaller value, set aTree[i] to its index. 001650 ** Then set pReadr2 to the next PmaReader to compare to pReadr1. In this 001651 ** case there is no cache of pReadr2 in pTask->pUnpacked, so set 001652 ** pKey2 to point to the record belonging to pReadr2. 001653 ** 001654 ** Alternatively, if pReadr2 contains the smaller of the two values, 001655 ** set aTree[i] to its index and update pReadr1. If vdbeSorterCompare() 001656 ** was actually called above, then pTask->pUnpacked now contains 001657 ** a value equivalent to pReadr2. So set pKey2 to NULL to prevent 001658 ** vdbeSorterCompare() from decoding pReadr2 again. 001659 ** 001660 ** If the two values were equal, then the value from the oldest 001661 ** PMA should be considered smaller. The VdbeSorter.aReadr[] array 001662 ** is sorted from oldest to newest, so pReadr1 contains older values 001663 ** than pReadr2 iff (pReadr1<pReadr2). */ 001664 if( iRes<0 || (iRes==0 && pReadr1<pReadr2) ){ 001665 pMerger->aTree[i] = (int)(pReadr1 - pMerger->aReadr); 001666 pReadr2 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ]; 001667 bCached = 0; 001668 }else{ 001669 if( pReadr1->pFd ) bCached = 0; 001670 pMerger->aTree[i] = (int)(pReadr2 - pMerger->aReadr); 001671 pReadr1 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ]; 001672 } 001673 } 001674 *pbEof = (pMerger->aReadr[pMerger->aTree[1]].pFd==0); 001675 } 001676 001677 return (rc==SQLITE_OK ? pTask->pUnpacked->errCode : rc); 001678 } 001679 001680 #if SQLITE_MAX_WORKER_THREADS>0 001681 /* 001682 ** The main routine for background threads that write level-0 PMAs. 001683 */ 001684 static void *vdbeSorterFlushThread(void *pCtx){ 001685 SortSubtask *pTask = (SortSubtask*)pCtx; 001686 int rc; /* Return code */ 001687 assert( pTask->bDone==0 ); 001688 rc = vdbeSorterListToPMA(pTask, &pTask->list); 001689 pTask->bDone = 1; 001690 return SQLITE_INT_TO_PTR(rc); 001691 } 001692 #endif /* SQLITE_MAX_WORKER_THREADS>0 */ 001693 001694 /* 001695 ** Flush the current contents of VdbeSorter.list to a new PMA, possibly 001696 ** using a background thread. 001697 */ 001698 static int vdbeSorterFlushPMA(VdbeSorter *pSorter){ 001699 #if SQLITE_MAX_WORKER_THREADS==0 001700 pSorter->bUsePMA = 1; 001701 return vdbeSorterListToPMA(&pSorter->aTask[0], &pSorter->list); 001702 #else 001703 int rc = SQLITE_OK; 001704 int i; 001705 SortSubtask *pTask = 0; /* Thread context used to create new PMA */ 001706 int nWorker = (pSorter->nTask-1); 001707 001708 /* Set the flag to indicate that at least one PMA has been written. 001709 ** Or will be, anyhow. */ 001710 pSorter->bUsePMA = 1; 001711 001712 /* Select a sub-task to sort and flush the current list of in-memory 001713 ** records to disk. If the sorter is running in multi-threaded mode, 001714 ** round-robin between the first (pSorter->nTask-1) tasks. Except, if 001715 ** the background thread from a sub-tasks previous turn is still running, 001716 ** skip it. If the first (pSorter->nTask-1) sub-tasks are all still busy, 001717 ** fall back to using the final sub-task. The first (pSorter->nTask-1) 001718 ** sub-tasks are prefered as they use background threads - the final 001719 ** sub-task uses the main thread. */ 001720 for(i=0; i<nWorker; i++){ 001721 int iTest = (pSorter->iPrev + i + 1) % nWorker; 001722 pTask = &pSorter->aTask[iTest]; 001723 if( pTask->bDone ){ 001724 rc = vdbeSorterJoinThread(pTask); 001725 } 001726 if( rc!=SQLITE_OK || pTask->pThread==0 ) break; 001727 } 001728 001729 if( rc==SQLITE_OK ){ 001730 if( i==nWorker ){ 001731 /* Use the foreground thread for this operation */ 001732 rc = vdbeSorterListToPMA(&pSorter->aTask[nWorker], &pSorter->list); 001733 }else{ 001734 /* Launch a background thread for this operation */ 001735 u8 *aMem = pTask->list.aMemory; 001736 void *pCtx = (void*)pTask; 001737 001738 assert( pTask->pThread==0 && pTask->bDone==0 ); 001739 assert( pTask->list.pList==0 ); 001740 assert( pTask->list.aMemory==0 || pSorter->list.aMemory!=0 ); 001741 001742 pSorter->iPrev = (u8)(pTask - pSorter->aTask); 001743 pTask->list = pSorter->list; 001744 pSorter->list.pList = 0; 001745 pSorter->list.szPMA = 0; 001746 if( aMem ){ 001747 pSorter->list.aMemory = aMem; 001748 pSorter->nMemory = sqlite3MallocSize(aMem); 001749 }else if( pSorter->list.aMemory ){ 001750 pSorter->list.aMemory = sqlite3Malloc(pSorter->nMemory); 001751 if( !pSorter->list.aMemory ) return SQLITE_NOMEM_BKPT; 001752 } 001753 001754 rc = vdbeSorterCreateThread(pTask, vdbeSorterFlushThread, pCtx); 001755 } 001756 } 001757 001758 return rc; 001759 #endif /* SQLITE_MAX_WORKER_THREADS!=0 */ 001760 } 001761 001762 /* 001763 ** Add a record to the sorter. 001764 */ 001765 int sqlite3VdbeSorterWrite( 001766 const VdbeCursor *pCsr, /* Sorter cursor */ 001767 Mem *pVal /* Memory cell containing record */ 001768 ){ 001769 VdbeSorter *pSorter; 001770 int rc = SQLITE_OK; /* Return Code */ 001771 SorterRecord *pNew; /* New list element */ 001772 int bFlush; /* True to flush contents of memory to PMA */ 001773 int nReq; /* Bytes of memory required */ 001774 int nPMA; /* Bytes of PMA space required */ 001775 int t; /* serial type of first record field */ 001776 001777 assert( pCsr->eCurType==CURTYPE_SORTER ); 001778 pSorter = pCsr->uc.pSorter; 001779 getVarint32((const u8*)&pVal->z[1], t); 001780 if( t>0 && t<10 && t!=7 ){ 001781 pSorter->typeMask &= SORTER_TYPE_INTEGER; 001782 }else if( t>10 && (t & 0x01) ){ 001783 pSorter->typeMask &= SORTER_TYPE_TEXT; 001784 }else{ 001785 pSorter->typeMask = 0; 001786 } 001787 001788 assert( pSorter ); 001789 001790 /* Figure out whether or not the current contents of memory should be 001791 ** flushed to a PMA before continuing. If so, do so. 001792 ** 001793 ** If using the single large allocation mode (pSorter->aMemory!=0), then 001794 ** flush the contents of memory to a new PMA if (a) at least one value is 001795 ** already in memory and (b) the new value will not fit in memory. 001796 ** 001797 ** Or, if using separate allocations for each record, flush the contents 001798 ** of memory to a PMA if either of the following are true: 001799 ** 001800 ** * The total memory allocated for the in-memory list is greater 001801 ** than (page-size * cache-size), or 001802 ** 001803 ** * The total memory allocated for the in-memory list is greater 001804 ** than (page-size * 10) and sqlite3HeapNearlyFull() returns true. 001805 */ 001806 nReq = pVal->n + sizeof(SorterRecord); 001807 nPMA = pVal->n + sqlite3VarintLen(pVal->n); 001808 if( pSorter->mxPmaSize ){ 001809 if( pSorter->list.aMemory ){ 001810 bFlush = pSorter->iMemory && (pSorter->iMemory+nReq) > pSorter->mxPmaSize; 001811 }else{ 001812 bFlush = ( 001813 (pSorter->list.szPMA > pSorter->mxPmaSize) 001814 || (pSorter->list.szPMA > pSorter->mnPmaSize && sqlite3HeapNearlyFull()) 001815 ); 001816 } 001817 if( bFlush ){ 001818 rc = vdbeSorterFlushPMA(pSorter); 001819 pSorter->list.szPMA = 0; 001820 pSorter->iMemory = 0; 001821 assert( rc!=SQLITE_OK || pSorter->list.pList==0 ); 001822 } 001823 } 001824 001825 pSorter->list.szPMA += nPMA; 001826 if( nPMA>pSorter->mxKeysize ){ 001827 pSorter->mxKeysize = nPMA; 001828 } 001829 001830 if( pSorter->list.aMemory ){ 001831 int nMin = pSorter->iMemory + nReq; 001832 001833 if( nMin>pSorter->nMemory ){ 001834 u8 *aNew; 001835 int iListOff = (u8*)pSorter->list.pList - pSorter->list.aMemory; 001836 int nNew = pSorter->nMemory * 2; 001837 while( nNew < nMin ) nNew = nNew*2; 001838 if( nNew > pSorter->mxPmaSize ) nNew = pSorter->mxPmaSize; 001839 if( nNew < nMin ) nNew = nMin; 001840 001841 aNew = sqlite3Realloc(pSorter->list.aMemory, nNew); 001842 if( !aNew ) return SQLITE_NOMEM_BKPT; 001843 pSorter->list.pList = (SorterRecord*)&aNew[iListOff]; 001844 pSorter->list.aMemory = aNew; 001845 pSorter->nMemory = nNew; 001846 } 001847 001848 pNew = (SorterRecord*)&pSorter->list.aMemory[pSorter->iMemory]; 001849 pSorter->iMemory += ROUND8(nReq); 001850 if( pSorter->list.pList ){ 001851 pNew->u.iNext = (int)((u8*)(pSorter->list.pList) - pSorter->list.aMemory); 001852 } 001853 }else{ 001854 pNew = (SorterRecord *)sqlite3Malloc(nReq); 001855 if( pNew==0 ){ 001856 return SQLITE_NOMEM_BKPT; 001857 } 001858 pNew->u.pNext = pSorter->list.pList; 001859 } 001860 001861 memcpy(SRVAL(pNew), pVal->z, pVal->n); 001862 pNew->nVal = pVal->n; 001863 pSorter->list.pList = pNew; 001864 001865 return rc; 001866 } 001867 001868 /* 001869 ** Read keys from pIncr->pMerger and populate pIncr->aFile[1]. The format 001870 ** of the data stored in aFile[1] is the same as that used by regular PMAs, 001871 ** except that the number-of-bytes varint is omitted from the start. 001872 */ 001873 static int vdbeIncrPopulate(IncrMerger *pIncr){ 001874 int rc = SQLITE_OK; 001875 int rc2; 001876 i64 iStart = pIncr->iStartOff; 001877 SorterFile *pOut = &pIncr->aFile[1]; 001878 SortSubtask *pTask = pIncr->pTask; 001879 MergeEngine *pMerger = pIncr->pMerger; 001880 PmaWriter writer; 001881 assert( pIncr->bEof==0 ); 001882 001883 vdbeSorterPopulateDebug(pTask, "enter"); 001884 001885 vdbePmaWriterInit(pOut->pFd, &writer, pTask->pSorter->pgsz, iStart); 001886 while( rc==SQLITE_OK ){ 001887 int dummy; 001888 PmaReader *pReader = &pMerger->aReadr[ pMerger->aTree[1] ]; 001889 int nKey = pReader->nKey; 001890 i64 iEof = writer.iWriteOff + writer.iBufEnd; 001891 001892 /* Check if the output file is full or if the input has been exhausted. 001893 ** In either case exit the loop. */ 001894 if( pReader->pFd==0 ) break; 001895 if( (iEof + nKey + sqlite3VarintLen(nKey))>(iStart + pIncr->mxSz) ) break; 001896 001897 /* Write the next key to the output. */ 001898 vdbePmaWriteVarint(&writer, nKey); 001899 vdbePmaWriteBlob(&writer, pReader->aKey, nKey); 001900 assert( pIncr->pMerger->pTask==pTask ); 001901 rc = vdbeMergeEngineStep(pIncr->pMerger, &dummy); 001902 } 001903 001904 rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof); 001905 if( rc==SQLITE_OK ) rc = rc2; 001906 vdbeSorterPopulateDebug(pTask, "exit"); 001907 return rc; 001908 } 001909 001910 #if SQLITE_MAX_WORKER_THREADS>0 001911 /* 001912 ** The main routine for background threads that populate aFile[1] of 001913 ** multi-threaded IncrMerger objects. 001914 */ 001915 static void *vdbeIncrPopulateThread(void *pCtx){ 001916 IncrMerger *pIncr = (IncrMerger*)pCtx; 001917 void *pRet = SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) ); 001918 pIncr->pTask->bDone = 1; 001919 return pRet; 001920 } 001921 001922 /* 001923 ** Launch a background thread to populate aFile[1] of pIncr. 001924 */ 001925 static int vdbeIncrBgPopulate(IncrMerger *pIncr){ 001926 void *p = (void*)pIncr; 001927 assert( pIncr->bUseThread ); 001928 return vdbeSorterCreateThread(pIncr->pTask, vdbeIncrPopulateThread, p); 001929 } 001930 #endif 001931 001932 /* 001933 ** This function is called when the PmaReader corresponding to pIncr has 001934 ** finished reading the contents of aFile[0]. Its purpose is to "refill" 001935 ** aFile[0] such that the PmaReader should start rereading it from the 001936 ** beginning. 001937 ** 001938 ** For single-threaded objects, this is accomplished by literally reading 001939 ** keys from pIncr->pMerger and repopulating aFile[0]. 001940 ** 001941 ** For multi-threaded objects, all that is required is to wait until the 001942 ** background thread is finished (if it is not already) and then swap 001943 ** aFile[0] and aFile[1] in place. If the contents of pMerger have not 001944 ** been exhausted, this function also launches a new background thread 001945 ** to populate the new aFile[1]. 001946 ** 001947 ** SQLITE_OK is returned on success, or an SQLite error code otherwise. 001948 */ 001949 static int vdbeIncrSwap(IncrMerger *pIncr){ 001950 int rc = SQLITE_OK; 001951 001952 #if SQLITE_MAX_WORKER_THREADS>0 001953 if( pIncr->bUseThread ){ 001954 rc = vdbeSorterJoinThread(pIncr->pTask); 001955 001956 if( rc==SQLITE_OK ){ 001957 SorterFile f0 = pIncr->aFile[0]; 001958 pIncr->aFile[0] = pIncr->aFile[1]; 001959 pIncr->aFile[1] = f0; 001960 } 001961 001962 if( rc==SQLITE_OK ){ 001963 if( pIncr->aFile[0].iEof==pIncr->iStartOff ){ 001964 pIncr->bEof = 1; 001965 }else{ 001966 rc = vdbeIncrBgPopulate(pIncr); 001967 } 001968 } 001969 }else 001970 #endif 001971 { 001972 rc = vdbeIncrPopulate(pIncr); 001973 pIncr->aFile[0] = pIncr->aFile[1]; 001974 if( pIncr->aFile[0].iEof==pIncr->iStartOff ){ 001975 pIncr->bEof = 1; 001976 } 001977 } 001978 001979 return rc; 001980 } 001981 001982 /* 001983 ** Allocate and return a new IncrMerger object to read data from pMerger. 001984 ** 001985 ** If an OOM condition is encountered, return NULL. In this case free the 001986 ** pMerger argument before returning. 001987 */ 001988 static int vdbeIncrMergerNew( 001989 SortSubtask *pTask, /* The thread that will be using the new IncrMerger */ 001990 MergeEngine *pMerger, /* The MergeEngine that the IncrMerger will control */ 001991 IncrMerger **ppOut /* Write the new IncrMerger here */ 001992 ){ 001993 int rc = SQLITE_OK; 001994 IncrMerger *pIncr = *ppOut = (IncrMerger*) 001995 (sqlite3FaultSim(100) ? 0 : sqlite3MallocZero(sizeof(*pIncr))); 001996 if( pIncr ){ 001997 pIncr->pMerger = pMerger; 001998 pIncr->pTask = pTask; 001999 pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2); 002000 pTask->file2.iEof += pIncr->mxSz; 002001 }else{ 002002 vdbeMergeEngineFree(pMerger); 002003 rc = SQLITE_NOMEM_BKPT; 002004 } 002005 return rc; 002006 } 002007 002008 #if SQLITE_MAX_WORKER_THREADS>0 002009 /* 002010 ** Set the "use-threads" flag on object pIncr. 002011 */ 002012 static void vdbeIncrMergerSetThreads(IncrMerger *pIncr){ 002013 pIncr->bUseThread = 1; 002014 pIncr->pTask->file2.iEof -= pIncr->mxSz; 002015 } 002016 #endif /* SQLITE_MAX_WORKER_THREADS>0 */ 002017 002018 002019 002020 /* 002021 ** Recompute pMerger->aTree[iOut] by comparing the next keys on the 002022 ** two PmaReaders that feed that entry. Neither of the PmaReaders 002023 ** are advanced. This routine merely does the comparison. 002024 */ 002025 static void vdbeMergeEngineCompare( 002026 MergeEngine *pMerger, /* Merge engine containing PmaReaders to compare */ 002027 int iOut /* Store the result in pMerger->aTree[iOut] */ 002028 ){ 002029 int i1; 002030 int i2; 002031 int iRes; 002032 PmaReader *p1; 002033 PmaReader *p2; 002034 002035 assert( iOut<pMerger->nTree && iOut>0 ); 002036 002037 if( iOut>=(pMerger->nTree/2) ){ 002038 i1 = (iOut - pMerger->nTree/2) * 2; 002039 i2 = i1 + 1; 002040 }else{ 002041 i1 = pMerger->aTree[iOut*2]; 002042 i2 = pMerger->aTree[iOut*2+1]; 002043 } 002044 002045 p1 = &pMerger->aReadr[i1]; 002046 p2 = &pMerger->aReadr[i2]; 002047 002048 if( p1->pFd==0 ){ 002049 iRes = i2; 002050 }else if( p2->pFd==0 ){ 002051 iRes = i1; 002052 }else{ 002053 SortSubtask *pTask = pMerger->pTask; 002054 int bCached = 0; 002055 int res; 002056 assert( pTask->pUnpacked!=0 ); /* from vdbeSortSubtaskMain() */ 002057 res = pTask->xCompare( 002058 pTask, &bCached, p1->aKey, p1->nKey, p2->aKey, p2->nKey 002059 ); 002060 if( res<=0 ){ 002061 iRes = i1; 002062 }else{ 002063 iRes = i2; 002064 } 002065 } 002066 002067 pMerger->aTree[iOut] = iRes; 002068 } 002069 002070 /* 002071 ** Allowed values for the eMode parameter to vdbeMergeEngineInit() 002072 ** and vdbePmaReaderIncrMergeInit(). 002073 ** 002074 ** Only INCRINIT_NORMAL is valid in single-threaded builds (when 002075 ** SQLITE_MAX_WORKER_THREADS==0). The other values are only used 002076 ** when there exists one or more separate worker threads. 002077 */ 002078 #define INCRINIT_NORMAL 0 002079 #define INCRINIT_TASK 1 002080 #define INCRINIT_ROOT 2 002081 002082 /* 002083 ** Forward reference required as the vdbeIncrMergeInit() and 002084 ** vdbePmaReaderIncrInit() routines are called mutually recursively when 002085 ** building a merge tree. 002086 */ 002087 static int vdbePmaReaderIncrInit(PmaReader *pReadr, int eMode); 002088 002089 /* 002090 ** Initialize the MergeEngine object passed as the second argument. Once this 002091 ** function returns, the first key of merged data may be read from the 002092 ** MergeEngine object in the usual fashion. 002093 ** 002094 ** If argument eMode is INCRINIT_ROOT, then it is assumed that any IncrMerge 002095 ** objects attached to the PmaReader objects that the merger reads from have 002096 ** already been populated, but that they have not yet populated aFile[0] and 002097 ** set the PmaReader objects up to read from it. In this case all that is 002098 ** required is to call vdbePmaReaderNext() on each PmaReader to point it at 002099 ** its first key. 002100 ** 002101 ** Otherwise, if eMode is any value other than INCRINIT_ROOT, then use 002102 ** vdbePmaReaderIncrMergeInit() to initialize each PmaReader that feeds data 002103 ** to pMerger. 002104 ** 002105 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise. 002106 */ 002107 static int vdbeMergeEngineInit( 002108 SortSubtask *pTask, /* Thread that will run pMerger */ 002109 MergeEngine *pMerger, /* MergeEngine to initialize */ 002110 int eMode /* One of the INCRINIT_XXX constants */ 002111 ){ 002112 int rc = SQLITE_OK; /* Return code */ 002113 int i; /* For looping over PmaReader objects */ 002114 int nTree = pMerger->nTree; 002115 002116 /* eMode is always INCRINIT_NORMAL in single-threaded mode */ 002117 assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL ); 002118 002119 /* Verify that the MergeEngine is assigned to a single thread */ 002120 assert( pMerger->pTask==0 ); 002121 pMerger->pTask = pTask; 002122 002123 for(i=0; i<nTree; i++){ 002124 if( SQLITE_MAX_WORKER_THREADS>0 && eMode==INCRINIT_ROOT ){ 002125 /* PmaReaders should be normally initialized in order, as if they are 002126 ** reading from the same temp file this makes for more linear file IO. 002127 ** However, in the INCRINIT_ROOT case, if PmaReader aReadr[nTask-1] is 002128 ** in use it will block the vdbePmaReaderNext() call while it uses 002129 ** the main thread to fill its buffer. So calling PmaReaderNext() 002130 ** on this PmaReader before any of the multi-threaded PmaReaders takes 002131 ** better advantage of multi-processor hardware. */ 002132 rc = vdbePmaReaderNext(&pMerger->aReadr[nTree-i-1]); 002133 }else{ 002134 rc = vdbePmaReaderIncrInit(&pMerger->aReadr[i], INCRINIT_NORMAL); 002135 } 002136 if( rc!=SQLITE_OK ) return rc; 002137 } 002138 002139 for(i=pMerger->nTree-1; i>0; i--){ 002140 vdbeMergeEngineCompare(pMerger, i); 002141 } 002142 return pTask->pUnpacked->errCode; 002143 } 002144 002145 /* 002146 ** The PmaReader passed as the first argument is guaranteed to be an 002147 ** incremental-reader (pReadr->pIncr!=0). This function serves to open 002148 ** and/or initialize the temp file related fields of the IncrMerge 002149 ** object at (pReadr->pIncr). 002150 ** 002151 ** If argument eMode is set to INCRINIT_NORMAL, then all PmaReaders 002152 ** in the sub-tree headed by pReadr are also initialized. Data is then 002153 ** loaded into the buffers belonging to pReadr and it is set to point to 002154 ** the first key in its range. 002155 ** 002156 ** If argument eMode is set to INCRINIT_TASK, then pReadr is guaranteed 002157 ** to be a multi-threaded PmaReader and this function is being called in a 002158 ** background thread. In this case all PmaReaders in the sub-tree are 002159 ** initialized as for INCRINIT_NORMAL and the aFile[1] buffer belonging to 002160 ** pReadr is populated. However, pReadr itself is not set up to point 002161 ** to its first key. A call to vdbePmaReaderNext() is still required to do 002162 ** that. 002163 ** 002164 ** The reason this function does not call vdbePmaReaderNext() immediately 002165 ** in the INCRINIT_TASK case is that vdbePmaReaderNext() assumes that it has 002166 ** to block on thread (pTask->thread) before accessing aFile[1]. But, since 002167 ** this entire function is being run by thread (pTask->thread), that will 002168 ** lead to the current background thread attempting to join itself. 002169 ** 002170 ** Finally, if argument eMode is set to INCRINIT_ROOT, it may be assumed 002171 ** that pReadr->pIncr is a multi-threaded IncrMerge objects, and that all 002172 ** child-trees have already been initialized using IncrInit(INCRINIT_TASK). 002173 ** In this case vdbePmaReaderNext() is called on all child PmaReaders and 002174 ** the current PmaReader set to point to the first key in its range. 002175 ** 002176 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise. 002177 */ 002178 static int vdbePmaReaderIncrMergeInit(PmaReader *pReadr, int eMode){ 002179 int rc = SQLITE_OK; 002180 IncrMerger *pIncr = pReadr->pIncr; 002181 SortSubtask *pTask = pIncr->pTask; 002182 sqlite3 *db = pTask->pSorter->db; 002183 002184 /* eMode is always INCRINIT_NORMAL in single-threaded mode */ 002185 assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL ); 002186 002187 rc = vdbeMergeEngineInit(pTask, pIncr->pMerger, eMode); 002188 002189 /* Set up the required files for pIncr. A multi-theaded IncrMerge object 002190 ** requires two temp files to itself, whereas a single-threaded object 002191 ** only requires a region of pTask->file2. */ 002192 if( rc==SQLITE_OK ){ 002193 int mxSz = pIncr->mxSz; 002194 #if SQLITE_MAX_WORKER_THREADS>0 002195 if( pIncr->bUseThread ){ 002196 rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[0].pFd); 002197 if( rc==SQLITE_OK ){ 002198 rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[1].pFd); 002199 } 002200 }else 002201 #endif 002202 /*if( !pIncr->bUseThread )*/{ 002203 if( pTask->file2.pFd==0 ){ 002204 assert( pTask->file2.iEof>0 ); 002205 rc = vdbeSorterOpenTempFile(db, pTask->file2.iEof, &pTask->file2.pFd); 002206 pTask->file2.iEof = 0; 002207 } 002208 if( rc==SQLITE_OK ){ 002209 pIncr->aFile[1].pFd = pTask->file2.pFd; 002210 pIncr->iStartOff = pTask->file2.iEof; 002211 pTask->file2.iEof += mxSz; 002212 } 002213 } 002214 } 002215 002216 #if SQLITE_MAX_WORKER_THREADS>0 002217 if( rc==SQLITE_OK && pIncr->bUseThread ){ 002218 /* Use the current thread to populate aFile[1], even though this 002219 ** PmaReader is multi-threaded. If this is an INCRINIT_TASK object, 002220 ** then this function is already running in background thread 002221 ** pIncr->pTask->thread. 002222 ** 002223 ** If this is the INCRINIT_ROOT object, then it is running in the 002224 ** main VDBE thread. But that is Ok, as that thread cannot return 002225 ** control to the VDBE or proceed with anything useful until the 002226 ** first results are ready from this merger object anyway. 002227 */ 002228 assert( eMode==INCRINIT_ROOT || eMode==INCRINIT_TASK ); 002229 rc = vdbeIncrPopulate(pIncr); 002230 } 002231 #endif 002232 002233 if( rc==SQLITE_OK && (SQLITE_MAX_WORKER_THREADS==0 || eMode!=INCRINIT_TASK) ){ 002234 rc = vdbePmaReaderNext(pReadr); 002235 } 002236 002237 return rc; 002238 } 002239 002240 #if SQLITE_MAX_WORKER_THREADS>0 002241 /* 002242 ** The main routine for vdbePmaReaderIncrMergeInit() operations run in 002243 ** background threads. 002244 */ 002245 static void *vdbePmaReaderBgIncrInit(void *pCtx){ 002246 PmaReader *pReader = (PmaReader*)pCtx; 002247 void *pRet = SQLITE_INT_TO_PTR( 002248 vdbePmaReaderIncrMergeInit(pReader,INCRINIT_TASK) 002249 ); 002250 pReader->pIncr->pTask->bDone = 1; 002251 return pRet; 002252 } 002253 #endif 002254 002255 /* 002256 ** If the PmaReader passed as the first argument is not an incremental-reader 002257 ** (if pReadr->pIncr==0), then this function is a no-op. Otherwise, it invokes 002258 ** the vdbePmaReaderIncrMergeInit() function with the parameters passed to 002259 ** this routine to initialize the incremental merge. 002260 ** 002261 ** If the IncrMerger object is multi-threaded (IncrMerger.bUseThread==1), 002262 ** then a background thread is launched to call vdbePmaReaderIncrMergeInit(). 002263 ** Or, if the IncrMerger is single threaded, the same function is called 002264 ** using the current thread. 002265 */ 002266 static int vdbePmaReaderIncrInit(PmaReader *pReadr, int eMode){ 002267 IncrMerger *pIncr = pReadr->pIncr; /* Incremental merger */ 002268 int rc = SQLITE_OK; /* Return code */ 002269 if( pIncr ){ 002270 #if SQLITE_MAX_WORKER_THREADS>0 002271 assert( pIncr->bUseThread==0 || eMode==INCRINIT_TASK ); 002272 if( pIncr->bUseThread ){ 002273 void *pCtx = (void*)pReadr; 002274 rc = vdbeSorterCreateThread(pIncr->pTask, vdbePmaReaderBgIncrInit, pCtx); 002275 }else 002276 #endif 002277 { 002278 rc = vdbePmaReaderIncrMergeInit(pReadr, eMode); 002279 } 002280 } 002281 return rc; 002282 } 002283 002284 /* 002285 ** Allocate a new MergeEngine object to merge the contents of nPMA level-0 002286 ** PMAs from pTask->file. If no error occurs, set *ppOut to point to 002287 ** the new object and return SQLITE_OK. Or, if an error does occur, set *ppOut 002288 ** to NULL and return an SQLite error code. 002289 ** 002290 ** When this function is called, *piOffset is set to the offset of the 002291 ** first PMA to read from pTask->file. Assuming no error occurs, it is 002292 ** set to the offset immediately following the last byte of the last 002293 ** PMA before returning. If an error does occur, then the final value of 002294 ** *piOffset is undefined. 002295 */ 002296 static int vdbeMergeEngineLevel0( 002297 SortSubtask *pTask, /* Sorter task to read from */ 002298 int nPMA, /* Number of PMAs to read */ 002299 i64 *piOffset, /* IN/OUT: Readr offset in pTask->file */ 002300 MergeEngine **ppOut /* OUT: New merge-engine */ 002301 ){ 002302 MergeEngine *pNew; /* Merge engine to return */ 002303 i64 iOff = *piOffset; 002304 int i; 002305 int rc = SQLITE_OK; 002306 002307 *ppOut = pNew = vdbeMergeEngineNew(nPMA); 002308 if( pNew==0 ) rc = SQLITE_NOMEM_BKPT; 002309 002310 for(i=0; i<nPMA && rc==SQLITE_OK; i++){ 002311 i64 nDummy = 0; 002312 PmaReader *pReadr = &pNew->aReadr[i]; 002313 rc = vdbePmaReaderInit(pTask, &pTask->file, iOff, pReadr, &nDummy); 002314 iOff = pReadr->iEof; 002315 } 002316 002317 if( rc!=SQLITE_OK ){ 002318 vdbeMergeEngineFree(pNew); 002319 *ppOut = 0; 002320 } 002321 *piOffset = iOff; 002322 return rc; 002323 } 002324 002325 /* 002326 ** Return the depth of a tree comprising nPMA PMAs, assuming a fanout of 002327 ** SORTER_MAX_MERGE_COUNT. The returned value does not include leaf nodes. 002328 ** 002329 ** i.e. 002330 ** 002331 ** nPMA<=16 -> TreeDepth() == 0 002332 ** nPMA<=256 -> TreeDepth() == 1 002333 ** nPMA<=65536 -> TreeDepth() == 2 002334 */ 002335 static int vdbeSorterTreeDepth(int nPMA){ 002336 int nDepth = 0; 002337 i64 nDiv = SORTER_MAX_MERGE_COUNT; 002338 while( nDiv < (i64)nPMA ){ 002339 nDiv = nDiv * SORTER_MAX_MERGE_COUNT; 002340 nDepth++; 002341 } 002342 return nDepth; 002343 } 002344 002345 /* 002346 ** pRoot is the root of an incremental merge-tree with depth nDepth (according 002347 ** to vdbeSorterTreeDepth()). pLeaf is the iSeq'th leaf to be added to the 002348 ** tree, counting from zero. This function adds pLeaf to the tree. 002349 ** 002350 ** If successful, SQLITE_OK is returned. If an error occurs, an SQLite error 002351 ** code is returned and pLeaf is freed. 002352 */ 002353 static int vdbeSorterAddToTree( 002354 SortSubtask *pTask, /* Task context */ 002355 int nDepth, /* Depth of tree according to TreeDepth() */ 002356 int iSeq, /* Sequence number of leaf within tree */ 002357 MergeEngine *pRoot, /* Root of tree */ 002358 MergeEngine *pLeaf /* Leaf to add to tree */ 002359 ){ 002360 int rc = SQLITE_OK; 002361 int nDiv = 1; 002362 int i; 002363 MergeEngine *p = pRoot; 002364 IncrMerger *pIncr; 002365 002366 rc = vdbeIncrMergerNew(pTask, pLeaf, &pIncr); 002367 002368 for(i=1; i<nDepth; i++){ 002369 nDiv = nDiv * SORTER_MAX_MERGE_COUNT; 002370 } 002371 002372 for(i=1; i<nDepth && rc==SQLITE_OK; i++){ 002373 int iIter = (iSeq / nDiv) % SORTER_MAX_MERGE_COUNT; 002374 PmaReader *pReadr = &p->aReadr[iIter]; 002375 002376 if( pReadr->pIncr==0 ){ 002377 MergeEngine *pNew = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT); 002378 if( pNew==0 ){ 002379 rc = SQLITE_NOMEM_BKPT; 002380 }else{ 002381 rc = vdbeIncrMergerNew(pTask, pNew, &pReadr->pIncr); 002382 } 002383 } 002384 if( rc==SQLITE_OK ){ 002385 p = pReadr->pIncr->pMerger; 002386 nDiv = nDiv / SORTER_MAX_MERGE_COUNT; 002387 } 002388 } 002389 002390 if( rc==SQLITE_OK ){ 002391 p->aReadr[iSeq % SORTER_MAX_MERGE_COUNT].pIncr = pIncr; 002392 }else{ 002393 vdbeIncrFree(pIncr); 002394 } 002395 return rc; 002396 } 002397 002398 /* 002399 ** This function is called as part of a SorterRewind() operation on a sorter 002400 ** that has already written two or more level-0 PMAs to one or more temp 002401 ** files. It builds a tree of MergeEngine/IncrMerger/PmaReader objects that 002402 ** can be used to incrementally merge all PMAs on disk. 002403 ** 002404 ** If successful, SQLITE_OK is returned and *ppOut set to point to the 002405 ** MergeEngine object at the root of the tree before returning. Or, if an 002406 ** error occurs, an SQLite error code is returned and the final value 002407 ** of *ppOut is undefined. 002408 */ 002409 static int vdbeSorterMergeTreeBuild( 002410 VdbeSorter *pSorter, /* The VDBE cursor that implements the sort */ 002411 MergeEngine **ppOut /* Write the MergeEngine here */ 002412 ){ 002413 MergeEngine *pMain = 0; 002414 int rc = SQLITE_OK; 002415 int iTask; 002416 002417 #if SQLITE_MAX_WORKER_THREADS>0 002418 /* If the sorter uses more than one task, then create the top-level 002419 ** MergeEngine here. This MergeEngine will read data from exactly 002420 ** one PmaReader per sub-task. */ 002421 assert( pSorter->bUseThreads || pSorter->nTask==1 ); 002422 if( pSorter->nTask>1 ){ 002423 pMain = vdbeMergeEngineNew(pSorter->nTask); 002424 if( pMain==0 ) rc = SQLITE_NOMEM_BKPT; 002425 } 002426 #endif 002427 002428 for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){ 002429 SortSubtask *pTask = &pSorter->aTask[iTask]; 002430 assert( pTask->nPMA>0 || SQLITE_MAX_WORKER_THREADS>0 ); 002431 if( SQLITE_MAX_WORKER_THREADS==0 || pTask->nPMA ){ 002432 MergeEngine *pRoot = 0; /* Root node of tree for this task */ 002433 int nDepth = vdbeSorterTreeDepth(pTask->nPMA); 002434 i64 iReadOff = 0; 002435 002436 if( pTask->nPMA<=SORTER_MAX_MERGE_COUNT ){ 002437 rc = vdbeMergeEngineLevel0(pTask, pTask->nPMA, &iReadOff, &pRoot); 002438 }else{ 002439 int i; 002440 int iSeq = 0; 002441 pRoot = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT); 002442 if( pRoot==0 ) rc = SQLITE_NOMEM_BKPT; 002443 for(i=0; i<pTask->nPMA && rc==SQLITE_OK; i += SORTER_MAX_MERGE_COUNT){ 002444 MergeEngine *pMerger = 0; /* New level-0 PMA merger */ 002445 int nReader; /* Number of level-0 PMAs to merge */ 002446 002447 nReader = MIN(pTask->nPMA - i, SORTER_MAX_MERGE_COUNT); 002448 rc = vdbeMergeEngineLevel0(pTask, nReader, &iReadOff, &pMerger); 002449 if( rc==SQLITE_OK ){ 002450 rc = vdbeSorterAddToTree(pTask, nDepth, iSeq++, pRoot, pMerger); 002451 } 002452 } 002453 } 002454 002455 if( rc==SQLITE_OK ){ 002456 #if SQLITE_MAX_WORKER_THREADS>0 002457 if( pMain!=0 ){ 002458 rc = vdbeIncrMergerNew(pTask, pRoot, &pMain->aReadr[iTask].pIncr); 002459 }else 002460 #endif 002461 { 002462 assert( pMain==0 ); 002463 pMain = pRoot; 002464 } 002465 }else{ 002466 vdbeMergeEngineFree(pRoot); 002467 } 002468 } 002469 } 002470 002471 if( rc!=SQLITE_OK ){ 002472 vdbeMergeEngineFree(pMain); 002473 pMain = 0; 002474 } 002475 *ppOut = pMain; 002476 return rc; 002477 } 002478 002479 /* 002480 ** This function is called as part of an sqlite3VdbeSorterRewind() operation 002481 ** on a sorter that has written two or more PMAs to temporary files. It sets 002482 ** up either VdbeSorter.pMerger (for single threaded sorters) or pReader 002483 ** (for multi-threaded sorters) so that it can be used to iterate through 002484 ** all records stored in the sorter. 002485 ** 002486 ** SQLITE_OK is returned if successful, or an SQLite error code otherwise. 002487 */ 002488 static int vdbeSorterSetupMerge(VdbeSorter *pSorter){ 002489 int rc; /* Return code */ 002490 SortSubtask *pTask0 = &pSorter->aTask[0]; 002491 MergeEngine *pMain = 0; 002492 #if SQLITE_MAX_WORKER_THREADS 002493 sqlite3 *db = pTask0->pSorter->db; 002494 int i; 002495 SorterCompare xCompare = vdbeSorterGetCompare(pSorter); 002496 for(i=0; i<pSorter->nTask; i++){ 002497 pSorter->aTask[i].xCompare = xCompare; 002498 } 002499 #endif 002500 002501 rc = vdbeSorterMergeTreeBuild(pSorter, &pMain); 002502 if( rc==SQLITE_OK ){ 002503 #if SQLITE_MAX_WORKER_THREADS 002504 assert( pSorter->bUseThreads==0 || pSorter->nTask>1 ); 002505 if( pSorter->bUseThreads ){ 002506 int iTask; 002507 PmaReader *pReadr = 0; 002508 SortSubtask *pLast = &pSorter->aTask[pSorter->nTask-1]; 002509 rc = vdbeSortAllocUnpacked(pLast); 002510 if( rc==SQLITE_OK ){ 002511 pReadr = (PmaReader*)sqlite3DbMallocZero(db, sizeof(PmaReader)); 002512 pSorter->pReader = pReadr; 002513 if( pReadr==0 ) rc = SQLITE_NOMEM_BKPT; 002514 } 002515 if( rc==SQLITE_OK ){ 002516 rc = vdbeIncrMergerNew(pLast, pMain, &pReadr->pIncr); 002517 if( rc==SQLITE_OK ){ 002518 vdbeIncrMergerSetThreads(pReadr->pIncr); 002519 for(iTask=0; iTask<(pSorter->nTask-1); iTask++){ 002520 IncrMerger *pIncr; 002521 if( (pIncr = pMain->aReadr[iTask].pIncr) ){ 002522 vdbeIncrMergerSetThreads(pIncr); 002523 assert( pIncr->pTask!=pLast ); 002524 } 002525 } 002526 for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){ 002527 /* Check that: 002528 ** 002529 ** a) The incremental merge object is configured to use the 002530 ** right task, and 002531 ** b) If it is using task (nTask-1), it is configured to run 002532 ** in single-threaded mode. This is important, as the 002533 ** root merge (INCRINIT_ROOT) will be using the same task 002534 ** object. 002535 */ 002536 PmaReader *p = &pMain->aReadr[iTask]; 002537 assert( p->pIncr==0 || ( 002538 (p->pIncr->pTask==&pSorter->aTask[iTask]) /* a */ 002539 && (iTask!=pSorter->nTask-1 || p->pIncr->bUseThread==0) /* b */ 002540 )); 002541 rc = vdbePmaReaderIncrInit(p, INCRINIT_TASK); 002542 } 002543 } 002544 pMain = 0; 002545 } 002546 if( rc==SQLITE_OK ){ 002547 rc = vdbePmaReaderIncrMergeInit(pReadr, INCRINIT_ROOT); 002548 } 002549 }else 002550 #endif 002551 { 002552 rc = vdbeMergeEngineInit(pTask0, pMain, INCRINIT_NORMAL); 002553 pSorter->pMerger = pMain; 002554 pMain = 0; 002555 } 002556 } 002557 002558 if( rc!=SQLITE_OK ){ 002559 vdbeMergeEngineFree(pMain); 002560 } 002561 return rc; 002562 } 002563 002564 002565 /* 002566 ** Once the sorter has been populated by calls to sqlite3VdbeSorterWrite, 002567 ** this function is called to prepare for iterating through the records 002568 ** in sorted order. 002569 */ 002570 int sqlite3VdbeSorterRewind(const VdbeCursor *pCsr, int *pbEof){ 002571 VdbeSorter *pSorter; 002572 int rc = SQLITE_OK; /* Return code */ 002573 002574 assert( pCsr->eCurType==CURTYPE_SORTER ); 002575 pSorter = pCsr->uc.pSorter; 002576 assert( pSorter ); 002577 002578 /* If no data has been written to disk, then do not do so now. Instead, 002579 ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly 002580 ** from the in-memory list. */ 002581 if( pSorter->bUsePMA==0 ){ 002582 if( pSorter->list.pList ){ 002583 *pbEof = 0; 002584 rc = vdbeSorterSort(&pSorter->aTask[0], &pSorter->list); 002585 }else{ 002586 *pbEof = 1; 002587 } 002588 return rc; 002589 } 002590 002591 /* Write the current in-memory list to a PMA. When the VdbeSorterWrite() 002592 ** function flushes the contents of memory to disk, it immediately always 002593 ** creates a new list consisting of a single key immediately afterwards. 002594 ** So the list is never empty at this point. */ 002595 assert( pSorter->list.pList ); 002596 rc = vdbeSorterFlushPMA(pSorter); 002597 002598 /* Join all threads */ 002599 rc = vdbeSorterJoinAll(pSorter, rc); 002600 002601 vdbeSorterRewindDebug("rewind"); 002602 002603 /* Assuming no errors have occurred, set up a merger structure to 002604 ** incrementally read and merge all remaining PMAs. */ 002605 assert( pSorter->pReader==0 ); 002606 if( rc==SQLITE_OK ){ 002607 rc = vdbeSorterSetupMerge(pSorter); 002608 *pbEof = 0; 002609 } 002610 002611 vdbeSorterRewindDebug("rewinddone"); 002612 return rc; 002613 } 002614 002615 /* 002616 ** Advance to the next element in the sorter. 002617 */ 002618 int sqlite3VdbeSorterNext(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){ 002619 VdbeSorter *pSorter; 002620 int rc; /* Return code */ 002621 002622 assert( pCsr->eCurType==CURTYPE_SORTER ); 002623 pSorter = pCsr->uc.pSorter; 002624 assert( pSorter->bUsePMA || (pSorter->pReader==0 && pSorter->pMerger==0) ); 002625 if( pSorter->bUsePMA ){ 002626 assert( pSorter->pReader==0 || pSorter->pMerger==0 ); 002627 assert( pSorter->bUseThreads==0 || pSorter->pReader ); 002628 assert( pSorter->bUseThreads==1 || pSorter->pMerger ); 002629 #if SQLITE_MAX_WORKER_THREADS>0 002630 if( pSorter->bUseThreads ){ 002631 rc = vdbePmaReaderNext(pSorter->pReader); 002632 *pbEof = (pSorter->pReader->pFd==0); 002633 }else 002634 #endif 002635 /*if( !pSorter->bUseThreads )*/ { 002636 assert( pSorter->pMerger!=0 ); 002637 assert( pSorter->pMerger->pTask==(&pSorter->aTask[0]) ); 002638 rc = vdbeMergeEngineStep(pSorter->pMerger, pbEof); 002639 } 002640 }else{ 002641 SorterRecord *pFree = pSorter->list.pList; 002642 pSorter->list.pList = pFree->u.pNext; 002643 pFree->u.pNext = 0; 002644 if( pSorter->list.aMemory==0 ) vdbeSorterRecordFree(db, pFree); 002645 *pbEof = !pSorter->list.pList; 002646 rc = SQLITE_OK; 002647 } 002648 return rc; 002649 } 002650 002651 /* 002652 ** Return a pointer to a buffer owned by the sorter that contains the 002653 ** current key. 002654 */ 002655 static void *vdbeSorterRowkey( 002656 const VdbeSorter *pSorter, /* Sorter object */ 002657 int *pnKey /* OUT: Size of current key in bytes */ 002658 ){ 002659 void *pKey; 002660 if( pSorter->bUsePMA ){ 002661 PmaReader *pReader; 002662 #if SQLITE_MAX_WORKER_THREADS>0 002663 if( pSorter->bUseThreads ){ 002664 pReader = pSorter->pReader; 002665 }else 002666 #endif 002667 /*if( !pSorter->bUseThreads )*/{ 002668 pReader = &pSorter->pMerger->aReadr[pSorter->pMerger->aTree[1]]; 002669 } 002670 *pnKey = pReader->nKey; 002671 pKey = pReader->aKey; 002672 }else{ 002673 *pnKey = pSorter->list.pList->nVal; 002674 pKey = SRVAL(pSorter->list.pList); 002675 } 002676 return pKey; 002677 } 002678 002679 /* 002680 ** Copy the current sorter key into the memory cell pOut. 002681 */ 002682 int sqlite3VdbeSorterRowkey(const VdbeCursor *pCsr, Mem *pOut){ 002683 VdbeSorter *pSorter; 002684 void *pKey; int nKey; /* Sorter key to copy into pOut */ 002685 002686 assert( pCsr->eCurType==CURTYPE_SORTER ); 002687 pSorter = pCsr->uc.pSorter; 002688 pKey = vdbeSorterRowkey(pSorter, &nKey); 002689 if( sqlite3VdbeMemClearAndResize(pOut, nKey) ){ 002690 return SQLITE_NOMEM_BKPT; 002691 } 002692 pOut->n = nKey; 002693 MemSetTypeFlag(pOut, MEM_Blob); 002694 memcpy(pOut->z, pKey, nKey); 002695 002696 return SQLITE_OK; 002697 } 002698 002699 /* 002700 ** Compare the key in memory cell pVal with the key that the sorter cursor 002701 ** passed as the first argument currently points to. For the purposes of 002702 ** the comparison, ignore the rowid field at the end of each record. 002703 ** 002704 ** If the sorter cursor key contains any NULL values, consider it to be 002705 ** less than pVal. Even if pVal also contains NULL values. 002706 ** 002707 ** If an error occurs, return an SQLite error code (i.e. SQLITE_NOMEM). 002708 ** Otherwise, set *pRes to a negative, zero or positive value if the 002709 ** key in pVal is smaller than, equal to or larger than the current sorter 002710 ** key. 002711 ** 002712 ** This routine forms the core of the OP_SorterCompare opcode, which in 002713 ** turn is used to verify uniqueness when constructing a UNIQUE INDEX. 002714 */ 002715 int sqlite3VdbeSorterCompare( 002716 const VdbeCursor *pCsr, /* Sorter cursor */ 002717 Mem *pVal, /* Value to compare to current sorter key */ 002718 int nKeyCol, /* Compare this many columns */ 002719 int *pRes /* OUT: Result of comparison */ 002720 ){ 002721 VdbeSorter *pSorter; 002722 UnpackedRecord *r2; 002723 KeyInfo *pKeyInfo; 002724 int i; 002725 void *pKey; int nKey; /* Sorter key to compare pVal with */ 002726 002727 assert( pCsr->eCurType==CURTYPE_SORTER ); 002728 pSorter = pCsr->uc.pSorter; 002729 r2 = pSorter->pUnpacked; 002730 pKeyInfo = pCsr->pKeyInfo; 002731 if( r2==0 ){ 002732 r2 = pSorter->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pKeyInfo); 002733 if( r2==0 ) return SQLITE_NOMEM_BKPT; 002734 r2->nField = nKeyCol; 002735 } 002736 assert( r2->nField==nKeyCol ); 002737 002738 pKey = vdbeSorterRowkey(pSorter, &nKey); 002739 sqlite3VdbeRecordUnpack(pKeyInfo, nKey, pKey, r2); 002740 for(i=0; i<nKeyCol; i++){ 002741 if( r2->aMem[i].flags & MEM_Null ){ 002742 *pRes = -1; 002743 return SQLITE_OK; 002744 } 002745 } 002746 002747 *pRes = sqlite3VdbeRecordCompare(pVal->n, pVal->z, r2); 002748 return SQLITE_OK; 002749 }