LCOV - code coverage report
Current view: top level - src - cloudsync.c (source / functions) Coverage Total Hit
Test: coverage.info Lines: 86.9 % 2124 1846
Test Date: 2026-04-24 22:00:03 Functions: 91.4 % 128 117

            Line data    Source code
       1              : //
       2              : //  cloudsync.c
       3              : //  cloudsync
       4              : //
       5              : //  Created by Marco Bambini on 16/05/24.
       6              : //
       7              : 
       8              : #include <inttypes.h>
       9              : #include <stdbool.h>
      10              : #include <limits.h>
      11              : #include <stdint.h>
      12              : #include <stdlib.h>
      13              : #include <string.h>
      14              : #include <assert.h>
      15              : #include <stdio.h>
      16              : #include <errno.h>
      17              : #include <math.h>
      18              : 
      19              : #include "cloudsync.h"
      20              : #include "lz4.h"
      21              : #include "pk.h"
      22              : #include "sql.h"
      23              : #include "utils.h"
      24              : #include "dbutils.h"
      25              : #include "block.h"
      26              : 
      27              : #ifdef _WIN32
      28              : #include <winsock2.h>
      29              : #include <ws2tcpip.h>
      30              : #else
      31              : #include <arpa/inet.h>                          // for htonl, htons, ntohl, ntohs
      32              : #include <netinet/in.h>                         // for struct sockaddr_in, INADDR_ANY, etc. (if needed)
      33              : #endif
      34              : 
      35              : #ifndef htonll
      36              : #if __BIG_ENDIAN__
      37              : #define htonll(x)                               (x)
      38              : #define ntohll(x)                               (x)
      39              : #else
      40              : #ifndef htobe64
      41              : #define htonll(x)                               ((uint64_t)htonl((x) & 0xFFFFFFFF) << 32 | (uint64_t)htonl((x) >> 32))
      42              : #define ntohll(x)                               ((uint64_t)ntohl((x) & 0xFFFFFFFF) << 32 | (uint64_t)ntohl((x) >> 32))
      43              : #else
      44              : #define htonll(x)                               htobe64(x)
      45              : #define ntohll(x)                               be64toh(x)
      46              : #endif
      47              : #endif
      48              : #endif
      49              : 
      50              : #define CLOUDSYNC_INIT_NTABLES                  64
      51              : #define CLOUDSYNC_MIN_DB_VERSION                0
      52              : 
      53              : #define CLOUDSYNC_PAYLOAD_MINBUF_SIZE                   (512*1024)
      54              : #define CLOUDSYNC_PAYLOAD_SIGNATURE                     0x434C5359  /* 'C','L','S','Y' */
      55              : #define CLOUDSYNC_PAYLOAD_VERSION_ORIGNAL               1
      56              : #define CLOUDSYNC_PAYLOAD_VERSION_1                     CLOUDSYNC_PAYLOAD_VERSION_ORIGNAL
      57              : #define CLOUDSYNC_PAYLOAD_VERSION_2                     2
      58              : #define CLOUDSYNC_PAYLOAD_VERSION_LATEST                CLOUDSYNC_PAYLOAD_VERSION_2
      59              : #define CLOUDSYNC_PAYLOAD_MIN_VERSION_WITH_CHECKSUM     CLOUDSYNC_PAYLOAD_VERSION_2
      60              : 
      61              : #ifndef MAX
      62              : #define MAX(a, b)                               (((a)>(b))?(a):(b))
      63              : #endif
      64              : 
      65              : #define DEBUG_DBERROR(_rc, _fn, _data)   do {if (_rc != DBRES_OK) printf("Error in %s: %s\n", _fn, database_errmsg(_data));} while (0)
      66              : 
      67              : typedef enum {
      68              :     CLOUDSYNC_PK_INDEX_TBL          = 0,
      69              :     CLOUDSYNC_PK_INDEX_PK           = 1,
      70              :     CLOUDSYNC_PK_INDEX_COLNAME      = 2,
      71              :     CLOUDSYNC_PK_INDEX_COLVALUE     = 3,
      72              :     CLOUDSYNC_PK_INDEX_COLVERSION   = 4,
      73              :     CLOUDSYNC_PK_INDEX_DBVERSION    = 5,
      74              :     CLOUDSYNC_PK_INDEX_SITEID       = 6,
      75              :     CLOUDSYNC_PK_INDEX_CL           = 7,
      76              :     CLOUDSYNC_PK_INDEX_SEQ          = 8
      77              : } CLOUDSYNC_PK_INDEX;
      78              : 
      79              : typedef enum {
      80              :     DBVM_VALUE_ERROR      = -1,
      81              :     DBVM_VALUE_UNCHANGED  = 0,
      82              :     DBVM_VALUE_CHANGED    = 1,
      83              : } DBVM_VALUE;
      84              : 
      85              : #define SYNCBIT_SET(_data)                  _data->insync = 1
      86              : #define SYNCBIT_RESET(_data)                _data->insync = 0
      87              : 
      88              : // MARK: - Deferred column-batch merge -
      89              : 
      90              : typedef struct {
      91              :     const char  *col_name;       // pointer into table_context->col_name[idx] (stable)
      92              :     dbvalue_t   *col_value;      // duplicated via database_value_dup (owned)
      93              :     int64_t     col_version;
      94              :     int64_t     db_version;
      95              :     uint8_t     site_id[UUID_LEN];
      96              :     int         site_id_len;
      97              :     int64_t     seq;
      98              : } merge_pending_entry;
      99              : 
     100              : typedef struct {
     101              :     cloudsync_table_context *table;
     102              :     char        *pk;             // malloc'd copy, freed on flush
     103              :     int         pk_len;
     104              :     int64_t     cl;
     105              :     bool        sentinel_pending;
     106              :     bool        row_exists;       // true when the PK already exists locally
     107              :     int         count;
     108              :     int         capacity;
     109              :     merge_pending_entry *entries;
     110              : 
     111              :     // Statement cache — reuse the prepared statement when the column
     112              :     // combination and row_exists flag match between consecutive PK flushes.
     113              :     dbvm_t      *cached_vm;
     114              :     bool        cached_row_exists;
     115              :     int         cached_col_count;
     116              :     const char  **cached_col_names; // array of pointers into table_context (not owned)
     117              : } merge_pending_batch;
     118              : 
     119              : // MARK: -
     120              : 
     121              : struct cloudsync_pk_decode_bind_context {
     122              :     dbvm_t      *vm;
     123              :     char        *tbl;
     124              :     int64_t     tbl_len;
     125              :     const void  *pk;
     126              :     int64_t     pk_len;
     127              :     char        *col_name;
     128              :     int64_t     col_name_len;
     129              :     int64_t     col_version;
     130              :     int64_t     db_version;
     131              :     const void  *site_id;
     132              :     int64_t     site_id_len;
     133              :     int64_t     cl;
     134              :     int64_t     seq;
     135              : };
     136              : 
     137              : struct cloudsync_context {
     138              :     void        *db;
     139              :     char        errmsg[1024];
     140              :     int         errcode;
     141              :     
     142              :     char        *libversion;
     143              :     uint8_t     site_id[UUID_LEN];
     144              :     int         insync;
     145              :     int         debug;
     146              :     bool        merge_equal_values;
     147              :     void        *aux_data;
     148              :     
     149              :     // stmts and context values
     150              :     dbvm_t      *schema_version_stmt;
     151              :     dbvm_t      *data_version_stmt;
     152              :     dbvm_t      *db_version_stmt;
     153              :     dbvm_t      *getset_siteid_stmt;
     154              :     int         data_version;
     155              :     int         schema_version;
     156              :     uint64_t    schema_hash;
     157              :     
     158              :     // set at transaction start and reset on commit/rollback
     159              :     int64_t    db_version;
     160              :     // version the DB would have if the transaction committed now
     161              :     int64_t    pending_db_version;
     162              :     // used to set an order inside each transaction
     163              :     int        seq;
     164              :     
     165              :     // optional schema_name to be set in the cloudsync_table_context
     166              :     char       *current_schema;
     167              :     
     168              :     // augmented tables are stored in-memory so we do not need to retrieve information about
     169              :     // col_names and cid from the disk each time a write statement is performed
     170              :     // we do also not need to use an hash map here because for few tables the direct
     171              :     // in-memory comparison with table name is faster
     172              :     cloudsync_table_context **tables;   // dense vector: [0..tables_count-1] are valid
     173              :     int tables_count;                   // size
     174              :     int tables_cap;                     // capacity
     175              : 
     176              :     int skip_decode_idx;                // -1 in sqlite, col_value index in postgresql
     177              : 
     178              :     // deferred column-batch merge (active during payload_apply)
     179              :     merge_pending_batch *pending_batch;
     180              : };
     181              : 
     182              : struct cloudsync_table_context {
     183              :     table_algo  algo;                           // CRDT algoritm associated to the table
     184              :     char        *name;                          // table name
     185              :     char        *schema;                        // table schema
     186              :     char        *meta_ref;                      // schema-qualified meta table name (e.g. "schema"."name_cloudsync")
     187              :     char        *base_ref;                      // schema-qualified base table name (e.g. "schema"."name")
     188              :     char        **col_name;                     // array of column names
     189              :     dbvm_t      **col_merge_stmt;               // array of merge insert stmt (indexed by col_name)
     190              :     dbvm_t      **col_value_stmt;               // array of column value stmt (indexed by col_name)
     191              :     int         *col_id;                        // array of column id
     192              :     col_algo_t  *col_algo;                      // per-column algorithm (normal or block)
     193              :     char        **col_delimiter;                // per-column delimiter for block splitting (NULL = default "\n")
     194              :     bool        has_block_cols;                 // quick check: does this table have any block columns?
     195              :     dbvm_t      *block_value_read_stmt;         // SELECT col_value FROM blocks table
     196              :     dbvm_t      *block_value_write_stmt;        // INSERT OR REPLACE into blocks table
     197              :     dbvm_t      *block_value_delete_stmt;       // DELETE from blocks table
     198              :     dbvm_t      *block_list_stmt;               // SELECT block entries for materialization
     199              :     char        *blocks_ref;                    // schema-qualified blocks table name
     200              :     int         ncols;                          // number of non primary key cols
     201              :     int         npks;                           // number of primary key cols
     202              :     bool        enabled;                        // flag to check if a table is enabled or disabled
     203              :     #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
     204              :     bool        rowid_only;                     // a table with no primary keys other than the implicit rowid
     205              :     #endif
     206              :     
     207              :     char        **pk_name;                      // array of primary key names
     208              : 
     209              :     // precompiled statements
     210              :     dbvm_t      *meta_pkexists_stmt;            // check if a primary key already exist in the augmented table
     211              :     dbvm_t      *meta_sentinel_update_stmt;     // update a local sentinel row
     212              :     dbvm_t      *meta_sentinel_insert_stmt;     // insert a local sentinel row
     213              :     dbvm_t      *meta_row_insert_update_stmt;   // insert/update a local row
     214              :     dbvm_t      *meta_row_drop_stmt;            // delete rows from meta
     215              :     dbvm_t      *meta_update_move_stmt;         // update rows in meta when pk changes
     216              :     dbvm_t      *meta_local_cl_stmt;            // compute local cl value
     217              :     dbvm_t      *meta_winner_clock_stmt;        // get the rowid of the last inserted/updated row in the meta table
     218              :     dbvm_t      *meta_merge_delete_drop;
     219              :     dbvm_t      *meta_zero_clock_stmt;
     220              :     dbvm_t      *meta_col_version_stmt;
     221              :     dbvm_t      *meta_site_id_stmt;
     222              :     
     223              :     dbvm_t      *real_col_values_stmt;          // retrieve all column values based on pk
     224              :     dbvm_t      *real_merge_delete_stmt;
     225              :     dbvm_t      *real_merge_sentinel_stmt;
     226              :     
     227              :     bool        is_altering;                    // flag to track if a table alteration is in progress
     228              : 
     229              :     // context
     230              :     cloudsync_context *context;
     231              : };
     232              : 
     233              : struct cloudsync_payload_context {
     234              :     char        *buffer;
     235              :     size_t      bsize;
     236              :     size_t      balloc;
     237              :     size_t      bused;
     238              :     uint64_t    nrows;
     239              :     uint16_t    ncols;
     240              : };
     241              : 
     242              : #ifdef _MSC_VER
     243              :     #pragma pack(push, 1) // For MSVC: pack struct with 1-byte alignment
     244              :     #define PACKED
     245              : #else
     246              :     #define PACKED __attribute__((__packed__))
     247              : #endif
     248              : 
     249              : typedef struct PACKED {
     250              :     uint32_t    signature;          // 'CLSY'
     251              :     uint8_t     version;            // protocol version
     252              :     uint8_t     libversion[3];      // major.minor.patch
     253              :     uint32_t    expanded_size;
     254              :     uint16_t    ncols;
     255              :     uint32_t    nrows;
     256              :     uint64_t    schema_hash;
     257              :     uint8_t     checksum[6];        // 48 bits checksum (to ensure struct is 32 bytes)
     258              : } cloudsync_payload_header;
     259              : 
     260              : #ifdef _MSC_VER
     261              :     #pragma pack(pop)
     262              : #endif
     263              : 
     264              : #if CLOUDSYNC_UNITTEST
     265              : bool force_uncompressed_blob = false;
     266              : #define CHECK_FORCE_UNCOMPRESSED_BUFFER()   if (force_uncompressed_blob) use_uncompressed_buffer = true
     267              : #else
     268              : #define CHECK_FORCE_UNCOMPRESSED_BUFFER()
     269              : #endif
     270              : 
     271              : // Internal prototypes
     272              : int local_mark_insert_or_update_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const char *col_name, int64_t db_version, int seq);
     273              : 
     274              : // MARK: - CRDT algos -
     275              : 
     276          622 : table_algo cloudsync_algo_from_name (const char *algo_name) {
     277          622 :     if (algo_name == NULL) return table_algo_none;
     278              :     
     279          622 :     if ((strcasecmp(algo_name, "CausalLengthSet") == 0) || (strcasecmp(algo_name, "cls") == 0)) return table_algo_crdt_cls;
     280          266 :     if ((strcasecmp(algo_name, "GrowOnlySet") == 0) || (strcasecmp(algo_name, "gos") == 0)) return table_algo_crdt_gos;
     281          259 :     if ((strcasecmp(algo_name, "DeleteWinsSet") == 0) || (strcasecmp(algo_name, "dws") == 0)) return table_algo_crdt_dws;
     282          258 :     if ((strcasecmp(algo_name, "AddWinsSet") == 0) || (strcasecmp(algo_name, "aws") == 0)) return table_algo_crdt_aws;
     283              :     
     284              :     // if nothing is found
     285          257 :     return table_algo_none;
     286          622 : }
     287              : 
     288          110 : const char *cloudsync_algo_name (table_algo algo) {
     289          110 :     switch (algo) {
     290          105 :         case table_algo_crdt_cls: return "cls";
     291            1 :         case table_algo_crdt_gos: return "gos";
     292            1 :         case table_algo_crdt_dws: return "dws";
     293            1 :         case table_algo_crdt_aws: return "aws";
     294            1 :         case table_algo_none: return NULL;
     295              :     }
     296            1 :     return NULL;
     297          110 : }
     298              : 
     299              : // MARK: - DBVM Utils -
     300              : 
     301        32019 : DBVM_VALUE dbvm_execute (dbvm_t *stmt, cloudsync_context *data) {
     302        32019 :     if (!stmt) return DBVM_VALUE_ERROR;
     303              : 
     304        32017 :     int rc = databasevm_step(stmt);
     305        32017 :     if (rc != DBRES_ROW && rc != DBRES_DONE) {
     306            2 :         if (data) DEBUG_DBERROR(rc, "stmt_execute", data);
     307            2 :         databasevm_reset(stmt);
     308            2 :         return DBVM_VALUE_ERROR;
     309              :     }
     310              : 
     311        32015 :     DBVM_VALUE result = DBVM_VALUE_CHANGED;
     312        32015 :     if (stmt == data->data_version_stmt) {
     313        29607 :         int version = (int)database_column_int(stmt, 0);
     314        29607 :         if (version != data->data_version) {
     315          225 :             data->data_version = version;
     316          225 :         } else {
     317        29382 :             result = DBVM_VALUE_UNCHANGED;
     318              :         }
     319        32015 :     } else if (stmt == data->schema_version_stmt) {
     320         1204 :         int version = (int)database_column_int(stmt, 0);
     321         1204 :         if (version > data->schema_version) {
     322          380 :             data->schema_version = version;
     323          380 :         } else {
     324          824 :             result = DBVM_VALUE_UNCHANGED;
     325              :         }
     326              :         
     327         2408 :     } else if (stmt == data->db_version_stmt) {
     328         1204 :         data->db_version = (rc == DBRES_DONE) ? CLOUDSYNC_MIN_DB_VERSION : database_column_int(stmt, 0);
     329         1204 :     }
     330              :     
     331        32015 :     databasevm_reset(stmt);
     332        32015 :     return result;
     333        32019 : }
     334              : 
     335         3587 : int dbvm_count (dbvm_t *stmt, const char *value, size_t len, int type) {
     336         3587 :     int result = -1;
     337         3587 :     int rc = DBRES_OK;
     338              :     
     339         3587 :     if (value) {
     340         3586 :         rc = (type == DBTYPE_TEXT) ? databasevm_bind_text(stmt, 1, value, (int)len) : databasevm_bind_blob(stmt, 1, value, len);
     341         3586 :         if (rc != DBRES_OK) goto cleanup;
     342         3586 :     }
     343              : 
     344         3587 :     rc = databasevm_step(stmt);
     345         7174 :     if (rc == DBRES_DONE) {
     346            1 :         result = 0;
     347            1 :         rc = DBRES_OK;
     348         3587 :     } else if (rc == DBRES_ROW) {
     349         3586 :         result = (int)database_column_int(stmt, 0);
     350         3586 :         rc = DBRES_OK;
     351         3586 :     }
     352              :     
     353              : cleanup:
     354         3587 :     databasevm_reset(stmt);
     355         3587 :     return result;
     356              : }
     357              : 
     358       255427 : void dbvm_reset (dbvm_t *stmt) {
     359       255427 :     if (!stmt) return;
     360       232452 :     databasevm_clear_bindings(stmt);
     361       232452 :     databasevm_reset(stmt);
     362       255427 : }
     363              : 
     364              : // MARK: - Database Version -
     365              : 
     366          671 : int cloudsync_dbversion_build_query (cloudsync_context *data, char **sql_out) {
     367              :     // this function must be manually called each time tables changes
     368              :     // because the query plan changes too and it must be re-prepared
     369              :     // unfortunately there is no other way
     370              : 
     371              :     // we need to execute a query like:
     372              :     /*
     373              :      SELECT max(version) as version FROM (
     374              :          SELECT max(db_version) as version FROM "table1_cloudsync"
     375              :          UNION ALL
     376              :          SELECT max(db_version) as version FROM "table2_cloudsync"
     377              :          UNION ALL
     378              :          SELECT max(db_version) as version FROM "table3_cloudsync"
     379              :          UNION
     380              :          SELECT value as version FROM cloudsync_settings WHERE key = 'pre_alter_dbversion'
     381              :      )
     382              :      */
     383              : 
     384              :     // the good news is that the query can be computed in SQLite without the need to do any extra computation from the host language
     385              : 
     386          671 :     *sql_out = NULL;
     387          671 :     return database_select_text(data, SQL_DBVERSION_BUILD_QUERY, sql_out);
     388              : }
     389              : 
     390          894 : int cloudsync_dbversion_rebuild (cloudsync_context *data) {
     391          894 :     if (data->db_version_stmt) {
     392          443 :         databasevm_finalize(data->db_version_stmt);
     393          443 :         data->db_version_stmt = NULL;
     394          443 :     }
     395              : 
     396          894 :     int64_t count = dbutils_table_settings_count_tables(data);
     397          894 :     if (count == 0) return DBRES_OK;
     398          671 :     else if (count == -1) return cloudsync_set_dberror(data);
     399              : 
     400          671 :     char *sql = NULL;
     401          671 :     int rc = cloudsync_dbversion_build_query(data, &sql);
     402          671 :     if (rc != DBRES_OK) return cloudsync_set_dberror(data);
     403              : 
     404              :     // A NULL SQL with rc == OK means the generator produced a NULL row:
     405              :     // sqlite_master has no *_cloudsync meta-tables (for example, the user
     406              :     // dropped the base table and its meta-table without calling
     407              :     // cloudsync_cleanup, leaving stale cloudsync_table_settings rows).
     408              :     // Treat this the same as count == 0: no prepared statement, db_version
     409              :     // stays at the minimum and will be rebuilt on the next cloudsync_init.
     410              :     // Genuine errors from database_select_text are handled above.
     411          670 :     if (!sql) return DBRES_OK;
     412              :     DEBUG_SQL("db_version_stmt: %s", sql);
     413              : 
     414          669 :     rc = databasevm_prepare(data, sql, (void **)&data->db_version_stmt, DBFLAG_PERSISTENT);
     415              :     DEBUG_STMT("db_version_stmt %p", data->db_version_stmt);
     416          669 :     cloudsync_memory_free(sql);
     417          669 :     return rc;
     418          894 : }
     419              : 
     420         1204 : int cloudsync_dbversion_rerun (cloudsync_context *data) {
     421         1204 :     DBVM_VALUE schema_changed = dbvm_execute(data->schema_version_stmt, data);
     422         1204 :     if (schema_changed == DBVM_VALUE_ERROR) return -1;
     423              : 
     424         1204 :     if (schema_changed == DBVM_VALUE_CHANGED) {
     425          380 :         int rc = cloudsync_dbversion_rebuild(data);
     426          380 :         if (rc != DBRES_OK) return -1;
     427          380 :     }
     428              : 
     429         1204 :     if (!data->db_version_stmt) {
     430            0 :         data->db_version = CLOUDSYNC_MIN_DB_VERSION;
     431            0 :         return 0;
     432              :     }
     433              : 
     434         1204 :     DBVM_VALUE rc = dbvm_execute(data->db_version_stmt, data);
     435         1204 :     if (rc == DBVM_VALUE_ERROR) return -1;
     436         1204 :     return 0;
     437         1204 : }
     438              : 
     439        29609 : int cloudsync_dbversion_check_uptodate (cloudsync_context *data) {
     440              :     // perform a PRAGMA data_version to check if some other process write any data
     441        29609 :     DBVM_VALUE rc = dbvm_execute(data->data_version_stmt, data);
     442        29609 :     if (rc == DBVM_VALUE_ERROR) return -1;
     443              :     
     444              :     // db_version is already set and there is no need to update it
     445        29607 :     if (data->db_version != CLOUDSYNC_VALUE_NOTSET && rc == DBVM_VALUE_UNCHANGED) return 0;
     446              :     
     447         1204 :     return cloudsync_dbversion_rerun(data);
     448        29609 : }
     449              : 
     450        29583 : int64_t cloudsync_dbversion_next (cloudsync_context *data, int64_t merging_version) {
     451        29583 :     int rc = cloudsync_dbversion_check_uptodate(data);
     452        29583 :     if (rc != DBRES_OK) return -1;
     453              :     
     454        29582 :     int64_t result = data->db_version + 1;
     455        29582 :     if (result < data->pending_db_version) result = data->pending_db_version;
     456        29582 :     if (merging_version != CLOUDSYNC_VALUE_NOTSET && result < merging_version) result = merging_version;
     457        29582 :     data->pending_db_version = result;
     458              :     
     459        29582 :     return result;
     460        29583 : }
     461              : 
     462              : // MARK: - PK Context -
     463              : 
     464            0 : char *cloudsync_pk_context_tbl (cloudsync_pk_decode_bind_context *ctx, int64_t *tbl_len) {
     465            0 :     *tbl_len = ctx->tbl_len;
     466            0 :     return ctx->tbl;
     467              : }
     468              : 
     469            0 : void *cloudsync_pk_context_pk (cloudsync_pk_decode_bind_context *ctx, int64_t *pk_len) {
     470            0 :     *pk_len = ctx->pk_len;
     471            0 :     return (void *)ctx->pk;
     472              : }
     473              : 
     474            0 : char *cloudsync_pk_context_colname (cloudsync_pk_decode_bind_context *ctx, int64_t *colname_len) {
     475            0 :     *colname_len = ctx->col_name_len;
     476            0 :     return ctx->col_name;
     477              : }
     478              : 
     479            0 : int64_t cloudsync_pk_context_cl (cloudsync_pk_decode_bind_context *ctx) {
     480            0 :     return ctx->cl;
     481              : }
     482              : 
     483            0 : int64_t cloudsync_pk_context_dbversion (cloudsync_pk_decode_bind_context *ctx) {
     484            0 :     return ctx->db_version;
     485              : }
     486              : 
     487              : // MARK: - CloudSync Context -
     488              : 
     489        12663 : int cloudsync_insync (cloudsync_context *data) {
     490        12663 :     return data->insync;
     491              : }
     492              : 
     493          797 : void *cloudsync_siteid (cloudsync_context *data) {
     494          797 :     return (void *)data->site_id;
     495              : }
     496              : 
     497            2 : void cloudsync_reset_siteid (cloudsync_context *data) {
     498            2 :     memset(data->site_id, 0, sizeof(uint8_t) * UUID_LEN);
     499            2 : }
     500              : 
     501          229 : int cloudsync_load_siteid (cloudsync_context *data) {
     502              :     // check if site_id was already loaded
     503          229 :     if (data->site_id[0] != 0) return DBRES_OK;
     504              :     
     505              :     // load site_id
     506          227 :     char *buffer = NULL;
     507          227 :     int64_t size = 0;
     508          227 :     int rc = database_select_blob(data, SQL_SITEID_SELECT_ROWID0, &buffer, &size);
     509          227 :     if (rc != DBRES_OK) return rc;
     510          227 :     if (!buffer || size != UUID_LEN) {
     511            0 :         if (buffer) cloudsync_memory_free(buffer);
     512            0 :         return cloudsync_set_error(data, "Unable to retrieve siteid", DBRES_MISUSE);
     513              :     }
     514              :     
     515          227 :     memcpy(data->site_id, buffer, UUID_LEN);
     516          227 :     cloudsync_memory_free(buffer);
     517              :     
     518          227 :     return DBRES_OK;
     519          229 : }
     520              : 
     521            2 : int64_t cloudsync_dbversion (cloudsync_context *data) {
     522            2 :     return data->db_version;
     523              : }
     524              : 
     525        12140 : int cloudsync_bumpseq (cloudsync_context *data) {
     526        12140 :     int value = data->seq;
     527        12140 :     data->seq += 1;
     528        12140 :     return value;
     529              : }
     530              : 
     531          286 : void cloudsync_update_schema_hash (cloudsync_context *data) {
     532          286 :     database_update_schema_hash(data, &data->schema_hash);
     533          286 : }
     534              : 
     535        62337 : void *cloudsync_db (cloudsync_context *data) {
     536        62337 :     return data->db;
     537              : }
     538              : 
     539          513 : int cloudsync_add_dbvms (cloudsync_context *data) {
     540              :     DEBUG_DBFUNCTION("cloudsync_add_stmts");
     541              :     
     542          513 :     if (data->data_version_stmt == NULL) {
     543          227 :         int rc = databasevm_prepare(data, SQL_DATA_VERSION, (void **)&data->data_version_stmt, DBFLAG_PERSISTENT);
     544              :         DEBUG_STMT("data_version_stmt %p", data->data_version_stmt);
     545          227 :         if (rc != DBRES_OK) return rc;
     546              :         DEBUG_SQL("data_version_stmt: %s", SQL_DATA_VERSION);
     547          227 :     }
     548              :     
     549          513 :     if (data->schema_version_stmt == NULL) {
     550          227 :         int rc = databasevm_prepare(data, SQL_SCHEMA_VERSION, (void **)&data->schema_version_stmt, DBFLAG_PERSISTENT);
     551              :         DEBUG_STMT("schema_version_stmt %p", data->schema_version_stmt);
     552          227 :         if (rc != DBRES_OK) return rc;
     553              :         DEBUG_SQL("schema_version_stmt: %s", SQL_SCHEMA_VERSION);
     554          227 :     }
     555              :     
     556          513 :     if (data->getset_siteid_stmt == NULL) {
     557              :         // get and set index of the site_id
     558              :         // in SQLite, we can’t directly combine an INSERT and a SELECT to both insert a row and return an identifier (rowid) in a single statement,
     559              :         // however, we can use a workaround by leveraging the INSERT statement with ON CONFLICT DO UPDATE and then combining it with RETURNING rowid
     560          227 :         int rc = databasevm_prepare(data, SQL_SITEID_GETSET_ROWID_BY_SITEID, (void **)&data->getset_siteid_stmt, DBFLAG_PERSISTENT);
     561              :         DEBUG_STMT("getset_siteid_stmt %p", data->getset_siteid_stmt);
     562          227 :         if (rc != DBRES_OK) return rc;
     563              :         DEBUG_SQL("getset_siteid_stmt: %s", SQL_SITEID_GETSET_ROWID_BY_SITEID);
     564          227 :     }
     565              :     
     566          513 :     return cloudsync_dbversion_rebuild(data);
     567          513 : }
     568              : 
     569           23 : int cloudsync_set_error (cloudsync_context *data, const char *err_user, int err_code) {
     570              :     // force err_code to be something different than OK
     571           23 :     if (err_code == DBRES_OK) err_code = database_errcode(data);
     572           23 :     if (err_code == DBRES_OK) err_code = DBRES_ERROR;
     573              :     
     574              :     // compute a meaningful error message
     575           23 :     if (err_user == NULL) {
     576            6 :         snprintf(data->errmsg, sizeof(data->errmsg), "%s", database_errmsg(data));
     577            6 :     } else {
     578           17 :         const char *db_error = database_errmsg(data);
     579              :         char db_error_copy[sizeof(data->errmsg)];
     580           17 :         int rc = database_errcode(data);
     581           17 :         if (rc == DBRES_OK) {
     582           17 :             snprintf(data->errmsg, sizeof(data->errmsg), "%s", err_user);
     583           17 :         } else {
     584            0 :             if (db_error == data->errmsg) {
     585            0 :                 snprintf(db_error_copy, sizeof(db_error_copy), "%s", db_error);
     586            0 :                 db_error = db_error_copy;
     587            0 :             }
     588            0 :             snprintf(data->errmsg, sizeof(data->errmsg), "%s (%s)", err_user, db_error);
     589              :         }
     590              :     }
     591              :     
     592           23 :     data->errcode = err_code;
     593           23 :     return err_code;
     594              : }
     595              : 
     596            6 : int cloudsync_set_dberror (cloudsync_context *data) {
     597            6 :     return cloudsync_set_error(data, NULL, DBRES_OK);
     598              : }
     599              : 
     600           14 : const char *cloudsync_errmsg (cloudsync_context *data) {
     601           14 :     return data->errmsg;
     602              : }
     603              : 
     604            4 : int cloudsync_errcode (cloudsync_context *data) {
     605            4 :     return data->errcode;
     606              : }
     607              : 
     608            2 : void cloudsync_reset_error (cloudsync_context *data) {
     609            2 :     data->errmsg[0] = 0;
     610            2 :     data->errcode = DBRES_OK;
     611            2 : }
     612              : 
     613            3 : void *cloudsync_auxdata (cloudsync_context *data) {
     614            3 :     return data->aux_data;
     615              : }
     616              : 
     617            2 : void cloudsync_set_auxdata (cloudsync_context *data, void *xdata) {
     618            2 :     data->aux_data = xdata;
     619            2 : }
     620              : 
     621            6 : void cloudsync_set_schema (cloudsync_context *data, const char *schema) {
     622            6 :     if (data->current_schema && schema && strcmp(data->current_schema, schema) == 0) return;
     623            5 :     if (data->current_schema) cloudsync_memory_free(data->current_schema);
     624            5 :     data->current_schema = NULL;
     625            5 :     if (schema) data->current_schema = cloudsync_string_dup_lowercase(schema);
     626            6 : }
     627              : 
     628         1289 : const char *cloudsync_schema (cloudsync_context *data) {
     629         1289 :     return data->current_schema;
     630              : }
     631              : 
     632            4 : const char *cloudsync_table_schema (cloudsync_context *data, const char *table_name) {
     633            4 :     cloudsync_table_context *table = table_lookup(data, table_name);
     634            4 :     if (!table) return NULL;
     635              : 
     636            1 :     return table->schema;
     637            4 : }
     638              : 
     639              : // MARK: - Table Utils -
     640              : 
     641           69 : void table_pknames_free (char **names, int nrows) {
     642           69 :     if (!names) return;
     643          132 :     for (int i = 0; i < nrows; ++i) {cloudsync_memory_free(names[i]);}
     644           46 :     cloudsync_memory_free(names);
     645           69 : }
     646              : 
     647          284 : char *table_build_mergedelete_sql (cloudsync_table_context *table) {
     648              :     #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
     649              :     if (table->rowid_only) {
     650              :         char *sql = memory_mprintf(SQL_DELETE_ROW_BY_ROWID, table->name);
     651              :         return sql;
     652              :     }
     653              :     #endif
     654              : 
     655          284 :     return sql_build_delete_by_pk(table->context, table->name, table->schema);
     656              : }
     657              : 
     658         1367 : char *table_build_mergeinsert_sql (cloudsync_table_context *table, const char *colname) {
     659         1367 :     char *sql = NULL;
     660              :     
     661              :     #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
     662              :     if (table->rowid_only) {
     663              :         if (colname == NULL) {
     664              :             // INSERT OR IGNORE INTO customers (first_name,last_name) VALUES (?,?);
     665              :             sql = memory_mprintf(SQL_INSERT_ROWID_IGNORE, table->name);
     666              :         } else {
     667              :             // INSERT INTO customers (first_name,last_name,age) VALUES (?,?,?) ON CONFLICT DO UPDATE SET age=?;
     668              :             sql = memory_mprintf(SQL_UPSERT_ROWID_AND_COL_BY_ROWID, table->name, colname, colname);
     669              :         }
     670              :         return sql;
     671              :     }
     672              :     #endif
     673              :     
     674         1367 :     if (colname == NULL) {
     675              :         // is sentinel insert
     676          284 :         sql = sql_build_insert_pk_ignore(table->context, table->name, table->schema);
     677          284 :     } else {
     678         1083 :         sql = sql_build_upsert_pk_and_col(table->context, table->name, colname, table->schema);
     679              :     }
     680         1367 :     return sql;
     681              : }
     682              : 
     683         1082 : char *table_build_value_sql (cloudsync_table_context *table, const char *colname) {
     684              :     #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
     685              :     if (table->rowid_only) {
     686              :         char *colnamequote = "\"";
     687              :         char *sql = memory_mprintf(SQL_SELECT_COLS_BY_ROWID_FMT, colnamequote, colname, colnamequote, table->name);
     688              :         return sql;
     689              :     }
     690              :     #endif
     691              :         
     692              :     // SELECT age FROM customers WHERE first_name=? AND last_name=?;
     693         1082 :     return sql_build_select_cols_by_pk(table->context, table->name, colname, table->schema);
     694              : }
     695              :     
     696          284 : cloudsync_table_context *table_create (cloudsync_context *data, const char *name, table_algo algo) {
     697              :     DEBUG_DBFUNCTION("table_create %s", name);
     698              :     
     699          284 :     cloudsync_table_context *table = (cloudsync_table_context *)cloudsync_memory_zeroalloc(sizeof(cloudsync_table_context));
     700          284 :     if (!table) return NULL;
     701              :     
     702          284 :     table->context = data;
     703          284 :     table->algo = algo;
     704          284 :     table->name = cloudsync_string_dup_lowercase(name);
     705              : 
     706              :     // Detect schema from metadata table location. If metadata table doesn't
     707              :     // exist yet (during initialization), fall back to cloudsync_schema() which
     708              :     // returns the explicitly set schema or current_schema().
     709          284 :     table->schema = database_table_schema(name);
     710          284 :     if (!table->schema) {
     711          284 :         const char *fallback_schema = cloudsync_schema(data);
     712          284 :         if (fallback_schema) {
     713            0 :             table->schema = cloudsync_string_dup(fallback_schema);
     714            0 :         }
     715          284 :     }
     716              : 
     717          284 :     if (!table->name) {
     718            0 :         cloudsync_memory_free(table);
     719            0 :         return NULL;
     720              :     }
     721          284 :     table->meta_ref = database_build_meta_ref(table->schema, table->name);
     722          284 :     table->base_ref = database_build_base_ref(table->schema, table->name);
     723          284 :     table->enabled = true;
     724              :         
     725          284 :     return table;
     726          284 : }
     727              : 
     728          284 : void table_free (cloudsync_table_context *table) {
     729              :     DEBUG_DBFUNCTION("table_free %s", (table) ? (table->name) : "NULL");
     730          284 :     if (!table) return;
     731              :     
     732          284 :     if (table->col_name) {
     733         1327 :         for (int i=0; i<table->ncols; ++i) {
     734         1082 :             cloudsync_memory_free(table->col_name[i]);
     735         1082 :         }
     736          245 :         cloudsync_memory_free(table->col_name);
     737          245 :     }
     738          284 :     if (table->col_merge_stmt) {
     739         1327 :         for (int i=0; i<table->ncols; ++i) {
     740         1082 :             databasevm_finalize(table->col_merge_stmt[i]);
     741         1082 :         }
     742          245 :         cloudsync_memory_free(table->col_merge_stmt);
     743          245 :     }
     744          284 :     if (table->col_value_stmt) {
     745         1327 :         for (int i=0; i<table->ncols; ++i) {
     746         1082 :             databasevm_finalize(table->col_value_stmt[i]);
     747         1082 :         }
     748          245 :         cloudsync_memory_free(table->col_value_stmt);
     749          245 :     }
     750          284 :     if (table->col_id) {
     751          245 :         cloudsync_memory_free(table->col_id);
     752          245 :     }
     753          284 :     if (table->col_algo) {
     754          245 :         cloudsync_memory_free(table->col_algo);
     755          245 :     }
     756          284 :     if (table->col_delimiter) {
     757         1327 :         for (int i=0; i<table->ncols; ++i) {
     758         1082 :             if (table->col_delimiter[i]) cloudsync_memory_free(table->col_delimiter[i]);
     759         1082 :         }
     760          245 :         cloudsync_memory_free(table->col_delimiter);
     761          245 :     }
     762              : 
     763          284 :     if (table->block_value_read_stmt) databasevm_finalize(table->block_value_read_stmt);
     764          284 :     if (table->block_value_write_stmt) databasevm_finalize(table->block_value_write_stmt);
     765          284 :     if (table->block_value_delete_stmt) databasevm_finalize(table->block_value_delete_stmt);
     766          284 :     if (table->block_list_stmt) databasevm_finalize(table->block_list_stmt);
     767          284 :     if (table->blocks_ref) cloudsync_memory_free(table->blocks_ref);
     768              : 
     769          284 :     if (table->name) cloudsync_memory_free(table->name);
     770          284 :     if (table->schema) cloudsync_memory_free(table->schema);
     771          284 :     if (table->meta_ref) cloudsync_memory_free(table->meta_ref);
     772          284 :     if (table->base_ref) cloudsync_memory_free(table->base_ref);
     773          284 :     if (table->pk_name) table_pknames_free(table->pk_name, table->npks);
     774          284 :     if (table->meta_pkexists_stmt) databasevm_finalize(table->meta_pkexists_stmt);
     775          284 :     if (table->meta_sentinel_update_stmt) databasevm_finalize(table->meta_sentinel_update_stmt);
     776          284 :     if (table->meta_sentinel_insert_stmt) databasevm_finalize(table->meta_sentinel_insert_stmt);
     777          284 :     if (table->meta_row_insert_update_stmt) databasevm_finalize(table->meta_row_insert_update_stmt);
     778          284 :     if (table->meta_row_drop_stmt) databasevm_finalize(table->meta_row_drop_stmt);
     779          284 :     if (table->meta_update_move_stmt) databasevm_finalize(table->meta_update_move_stmt);
     780          284 :     if (table->meta_local_cl_stmt) databasevm_finalize(table->meta_local_cl_stmt);
     781          284 :     if (table->meta_winner_clock_stmt) databasevm_finalize(table->meta_winner_clock_stmt);
     782          284 :     if (table->meta_merge_delete_drop) databasevm_finalize(table->meta_merge_delete_drop);
     783          284 :     if (table->meta_zero_clock_stmt) databasevm_finalize(table->meta_zero_clock_stmt);
     784          284 :     if (table->meta_col_version_stmt) databasevm_finalize(table->meta_col_version_stmt);
     785          284 :     if (table->meta_site_id_stmt) databasevm_finalize(table->meta_site_id_stmt);
     786              :     
     787          284 :     if (table->real_col_values_stmt) databasevm_finalize(table->real_col_values_stmt);
     788          284 :     if (table->real_merge_delete_stmt) databasevm_finalize(table->real_merge_delete_stmt);
     789          284 :     if (table->real_merge_sentinel_stmt) databasevm_finalize(table->real_merge_sentinel_stmt);
     790              :     
     791          284 :     cloudsync_memory_free(table);
     792          284 : }
     793              : 
     794          284 : int table_add_stmts (cloudsync_table_context *table, int ncols) {
     795          284 :     int rc = DBRES_OK;
     796          284 :     char *sql = NULL;
     797          284 :     cloudsync_context *data = table->context;
     798              :     
     799              :     // META TABLE statements
     800              :     
     801              :     // CREATE TABLE IF NOT EXISTS \"%w_cloudsync\" (pk BLOB NOT NULL, col_name TEXT NOT NULL, col_version INTEGER, db_version INTEGER, site_id INTEGER DEFAULT 0, seq INTEGER, PRIMARY KEY (pk, col_name));
     802              :     
     803              :     // precompile the pk exists statement
     804              :     // we do not need an index on the pk column because it is already covered by the fact that it is part of the prikeys
     805              :     // EXPLAIN QUERY PLAN reports: SEARCH table_name USING PRIMARY KEY (pk=?)
     806          284 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_ROW_EXISTS_BY_PK, table->meta_ref);
     807          284 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     808              :     DEBUG_SQL("meta_pkexists_stmt: %s", sql);
     809              : 
     810          284 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_pkexists_stmt, DBFLAG_PERSISTENT);
     811          284 :     cloudsync_memory_free(sql);
     812          284 :     if (rc != DBRES_OK) goto cleanup;
     813              : 
     814              :     // precompile the update local sentinel statement
     815          284 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPDATE_COL_BUMP_VERSION, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
     816          284 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     817              :     DEBUG_SQL("meta_sentinel_update_stmt: %s", sql);
     818              :     
     819          284 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_sentinel_update_stmt, DBFLAG_PERSISTENT);
     820          284 :     cloudsync_memory_free(sql);
     821          284 :     if (rc != DBRES_OK) goto cleanup;
     822              :     
     823              :     // precompile the insert local sentinel statement
     824          284 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPSERT_COL_INIT_OR_BUMP_VERSION, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE, table->meta_ref, table->meta_ref, table->meta_ref);
     825          284 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     826              :     DEBUG_SQL("meta_sentinel_insert_stmt: %s", sql);
     827              :     
     828          284 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_sentinel_insert_stmt, DBFLAG_PERSISTENT);
     829          284 :     cloudsync_memory_free(sql);
     830          284 :     if (rc != DBRES_OK) goto cleanup;
     831              : 
     832              :     // precompile the insert/update local row statement
     833          284 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPSERT_RAW_COLVERSION, table->meta_ref, table->meta_ref);
     834          284 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     835              :     DEBUG_SQL("meta_row_insert_update_stmt: %s", sql);
     836              :     
     837          284 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_row_insert_update_stmt, DBFLAG_PERSISTENT);
     838          284 :     cloudsync_memory_free(sql);
     839          284 :     if (rc != DBRES_OK) goto cleanup;
     840              :     
     841              :     // precompile the delete rows from meta
     842          284 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_DELETE_PK_EXCEPT_COL, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
     843          284 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     844              :     DEBUG_SQL("meta_row_drop_stmt: %s", sql);
     845              : 
     846          284 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_row_drop_stmt, DBFLAG_PERSISTENT);
     847          284 :     cloudsync_memory_free(sql);
     848          284 :     if (rc != DBRES_OK) goto cleanup;
     849              :     
     850              :     // precompile the update rows from meta when pk changes
     851              :     // see https://github.com/sqliteai/sqlite-sync/blob/main/docs/PriKey.md for more details
     852          284 :     sql = sql_build_rekey_pk_and_reset_version_except_col(data, table->name, CLOUDSYNC_TOMBSTONE_VALUE);
     853          284 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     854              :     DEBUG_SQL("meta_update_move_stmt: %s", sql);
     855              :     
     856          284 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_update_move_stmt, DBFLAG_PERSISTENT);
     857          284 :     cloudsync_memory_free(sql);
     858          284 :     if (rc != DBRES_OK) goto cleanup;
     859              :     
     860              :     // local cl
     861          284 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_GET_COL_VERSION_OR_ROW_EXISTS, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE, table->meta_ref);
     862          284 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     863              :     DEBUG_SQL("meta_local_cl_stmt: %s", sql);
     864              :     
     865          284 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_local_cl_stmt, DBFLAG_PERSISTENT);
     866          284 :     cloudsync_memory_free(sql);
     867          284 :     if (rc != DBRES_OK) goto cleanup;
     868              :     
     869              :     // rowid of the last inserted/updated row in the meta table
     870          284 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_INSERT_RETURN_CHANGE_ID, table->meta_ref);
     871          284 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     872              :     DEBUG_SQL("meta_winner_clock_stmt: %s", sql);
     873              :     
     874          284 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_winner_clock_stmt, DBFLAG_PERSISTENT);
     875          284 :     cloudsync_memory_free(sql);
     876          284 :     if (rc != DBRES_OK) goto cleanup;
     877              :     
     878          284 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_DELETE_PK_EXCEPT_COL, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
     879          284 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     880              :     DEBUG_SQL("meta_merge_delete_drop: %s", sql);
     881              : 
     882          284 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_merge_delete_drop, DBFLAG_PERSISTENT);
     883          284 :     cloudsync_memory_free(sql);
     884          284 :     if (rc != DBRES_OK) goto cleanup;
     885              :     
     886              :     // zero clock
     887          284 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_TOMBSTONE_PK_EXCEPT_COL, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
     888          284 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     889              :     DEBUG_SQL("meta_zero_clock_stmt: %s", sql);
     890              :     
     891          284 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_zero_clock_stmt, DBFLAG_PERSISTENT);
     892          284 :     cloudsync_memory_free(sql);
     893          284 :     if (rc != DBRES_OK) goto cleanup;
     894              :     
     895              :     // col_version
     896          284 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_COL_VERSION_BY_PK_COL, table->meta_ref);
     897          284 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     898              :     DEBUG_SQL("meta_col_version_stmt: %s", sql);
     899              :     
     900          284 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_col_version_stmt, DBFLAG_PERSISTENT);
     901          284 :     cloudsync_memory_free(sql);
     902          284 :     if (rc != DBRES_OK) goto cleanup;
     903              :     
     904              :     // site_id
     905          284 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_SITE_ID_BY_PK_COL, table->meta_ref);
     906          284 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     907              :     DEBUG_SQL("meta_site_id_stmt: %s", sql);
     908              :     
     909          284 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_site_id_stmt, DBFLAG_PERSISTENT);
     910          284 :     cloudsync_memory_free(sql);
     911          284 :     if (rc != DBRES_OK) goto cleanup;
     912              :     
     913              :     // REAL TABLE statements
     914              : 
     915              :     // precompile the get column value statement
     916          284 :     if (ncols > 0) {
     917          245 :         sql = sql_build_select_nonpk_by_pk(data, table->name, table->schema);
     918          245 :         if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     919              :         DEBUG_SQL("real_col_values_stmt: %s", sql);
     920              :         
     921          245 :         rc = databasevm_prepare(data, sql, (void **)&table->real_col_values_stmt, DBFLAG_PERSISTENT);
     922          245 :         cloudsync_memory_free(sql);
     923          245 :         if (rc != DBRES_OK) goto cleanup;
     924          245 :     }
     925              :     
     926          284 :     sql = table_build_mergedelete_sql(table);
     927          284 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     928              :     DEBUG_SQL("real_merge_delete: %s", sql);
     929              :     
     930          284 :     rc = databasevm_prepare(data, sql, (void **)&table->real_merge_delete_stmt, DBFLAG_PERSISTENT);
     931          284 :     cloudsync_memory_free(sql);
     932          284 :     if (rc != DBRES_OK) goto cleanup;
     933              :     
     934          284 :     sql = table_build_mergeinsert_sql(table, NULL);
     935          284 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     936              :     DEBUG_SQL("real_merge_sentinel: %s", sql);
     937              :     
     938          284 :     rc = databasevm_prepare(data, sql, (void **)&table->real_merge_sentinel_stmt, DBFLAG_PERSISTENT);
     939          284 :     cloudsync_memory_free(sql);
     940          284 :     if (rc != DBRES_OK) goto cleanup;
     941              :     
     942              : cleanup:
     943          284 :     if (rc != DBRES_OK) DEBUG_ALWAYS("table_add_stmts error: %d %s\n", rc, database_errmsg(data));
     944          284 :     return rc;
     945              : }
     946              : 
     947       184072 : cloudsync_table_context *table_lookup (cloudsync_context *data, const char *table_name) {
     948              :     DEBUG_DBFUNCTION("table_lookup %s", table_name);
     949              :     
     950       184072 :     if (table_name) {
     951       185698 :         for (int i=0; i<data->tables_count; ++i) {
     952       185119 :             if ((strcasecmp(data->tables[i]->name, table_name) == 0)) return data->tables[i];
     953         1626 :         }
     954          579 :     }
     955              :     
     956          579 :     return NULL;
     957       184072 : }
     958              : 
     959       169267 : void *table_column_lookup (cloudsync_table_context *table, const char *col_name, bool is_merge, int *index) {
     960              :     DEBUG_DBFUNCTION("table_column_lookup %s", col_name);
     961              :     
     962       315425 :     for (int i=0; i<table->ncols; ++i) {
     963       315425 :         if (strcasecmp(table->col_name[i], col_name) == 0) {
     964       169267 :             if (index) *index = i;
     965       169267 :             return (is_merge) ? table->col_merge_stmt[i] : table->col_value_stmt[i];
     966              :         }
     967       146158 :     }
     968              :     
     969            0 :     if (index) *index = -1;
     970            0 :     return NULL;
     971       169267 : }
     972              : 
     973          283 : int table_remove (cloudsync_context *data, cloudsync_table_context *table) {
     974          283 :     const char *table_name = table->name;
     975              :     DEBUG_DBFUNCTION("table_remove %s", table_name);
     976              :     
     977          333 :     for (int i = 0; i < data->tables_count; ++i) {
     978          333 :         cloudsync_table_context *t = data->tables[i];
     979              :         
     980              :         // pointer compare is fastest but fallback to strcasecmp if not same pointer
     981          333 :         if ((t == table) || ((strcasecmp(t->name, table_name) == 0))) {
     982          283 :             int last = data->tables_count - 1;
     983          283 :             data->tables[i] = data->tables[last];   // move last into the hole (keeps array dense)
     984          283 :             data->tables[last] = NULL;              // NULLify tail (as an extra security measure)
     985          283 :             data->tables_count--;
     986          283 :             return data->tables_count;
     987              :         }
     988           50 :     }
     989              :     
     990            0 :     return -1;
     991          283 : }
     992              : 
     993         1083 : int table_add_to_context_cb (void *xdata, int ncols, char **values, char **names) {
     994         1083 :     cloudsync_table_context *table = (cloudsync_table_context *)xdata;
     995         1083 :     cloudsync_context *data = table->context;
     996              : 
     997         1083 :     int index = table->ncols;
     998         2165 :     for (int i=0; i<ncols; i+=2) {
     999         1083 :         const char *name = values[i];
    1000         1083 :         int cid = (int)strtol(values[i+1], NULL, 0);
    1001              : 
    1002         1083 :         table->col_id[index] = cid;
    1003         1083 :         table->col_name[index] = cloudsync_string_dup_lowercase(name);
    1004         1083 :         if (!table->col_name[index]) goto error;
    1005              : 
    1006         1083 :         char *sql = table_build_mergeinsert_sql(table, name);
    1007         1083 :         if (!sql) goto error;
    1008              :         DEBUG_SQL("col_merge_stmt[%d]: %s", index, sql);
    1009              : 
    1010         1083 :         int rc = databasevm_prepare(data, sql, (void **)&table->col_merge_stmt[index], DBFLAG_PERSISTENT);
    1011         1083 :         cloudsync_memory_free(sql);
    1012         1083 :         if (rc != DBRES_OK) goto error;
    1013         1082 :         if (!table->col_merge_stmt[index]) goto error;
    1014              : 
    1015         1082 :         sql = table_build_value_sql(table, name);
    1016         1082 :         if (!sql) goto error;
    1017              :         DEBUG_SQL("col_value_stmt[%d]: %s", index, sql);
    1018              : 
    1019         1082 :         rc = databasevm_prepare(data, sql, (void **)&table->col_value_stmt[index], DBFLAG_PERSISTENT);
    1020         1082 :         cloudsync_memory_free(sql);
    1021         1082 :         if (rc != DBRES_OK) goto error;
    1022         1082 :         if (!table->col_value_stmt[index]) goto error;
    1023         1082 :     }
    1024         1082 :     table->ncols += 1;
    1025              : 
    1026         1082 :     return 0;
    1027              : 
    1028              : error:
    1029              :     // clean up partially-initialized entry at index
    1030            1 :     if (table->col_name[index]) {cloudsync_memory_free(table->col_name[index]); table->col_name[index] = NULL;}
    1031            1 :     if (table->col_merge_stmt[index]) {databasevm_finalize(table->col_merge_stmt[index]); table->col_merge_stmt[index] = NULL;}
    1032            1 :     if (table->col_value_stmt[index]) {databasevm_finalize(table->col_value_stmt[index]); table->col_value_stmt[index] = NULL;}
    1033            1 :     return 1;
    1034         1083 : }
    1035              : 
    1036          284 : bool table_ensure_capacity (cloudsync_context *data) {
    1037          284 :     if (data->tables_count < data->tables_cap) return true;
    1038              :     
    1039            0 :     int new_cap = data->tables_cap ? data->tables_cap * 2 : CLOUDSYNC_INIT_NTABLES;
    1040            0 :     size_t bytes = (size_t)new_cap * sizeof(*data->tables);
    1041            0 :     void *p = cloudsync_memory_realloc(data->tables, bytes);
    1042            0 :     if (!p) return false;
    1043              :     
    1044            0 :     data->tables = (cloudsync_table_context **)p;
    1045            0 :     data->tables_cap = new_cap;
    1046            0 :     return true;
    1047          284 : }
    1048              : 
    1049          289 : bool table_add_to_context (cloudsync_context *data, table_algo algo, const char *table_name) {
    1050              :     DEBUG_DBFUNCTION("cloudsync_context_add_table %s", table_name);
    1051              : 
    1052              :     // Check if table already initialized in this connection's context.
    1053              :     // Note: This prevents same-connection duplicate initialization.
    1054              :     // SQLite clients cannot distinguish schemas, so having 'public.users'
    1055              :     // and 'auth.users' would cause sync ambiguity. Users should avoid
    1056              :     // initializing tables with the same name in different schemas.
    1057              :     // If two concurrent connections initialize tables with the same name
    1058              :     // in different schemas, the behavior is undefined.
    1059          289 :     cloudsync_table_context *table = table_lookup(data, table_name);
    1060          289 :     if (table) return true;
    1061              :     
    1062              :     // check for space availability
    1063          284 :     if (!table_ensure_capacity(data)) return false;
    1064              :     
    1065              :     // setup a new table
    1066          284 :     table = table_create(data, table_name, algo);
    1067          284 :     if (!table) return false;
    1068              :     
    1069              :     // fill remaining metadata in the table
    1070          284 :     int count = database_count_pk(data, table_name, false, table->schema);
    1071          284 :     if (count < 0) {cloudsync_set_dberror(data); goto abort_add_table;}
    1072          284 :     table->npks = count;
    1073          284 :     if (table->npks == 0) {
    1074              :         #if CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
    1075            0 :         goto abort_add_table;
    1076              :         #else
    1077              :         table->rowid_only = true;
    1078              :         table->npks = 1; // rowid
    1079              :         #endif
    1080              :     }
    1081              :     
    1082          284 :     int ncols = database_count_nonpk(data, table_name, table->schema);
    1083          284 :     if (ncols < 0) {cloudsync_set_dberror(data); goto abort_add_table;}
    1084          284 :     int rc = table_add_stmts(table, ncols);
    1085          284 :     if (rc != DBRES_OK) goto abort_add_table;
    1086              :     
    1087              :     // a table with only pk(s) is totally legal
    1088          284 :     if (ncols > 0) {
    1089          245 :         table->col_name = (char **)cloudsync_memory_zeroalloc((uint64_t)(sizeof(char *) * ncols));
    1090          245 :         if (!table->col_name) goto abort_add_table;
    1091              : 
    1092          245 :         table->col_id = (int *)cloudsync_memory_zeroalloc((uint64_t)(sizeof(int) * ncols));
    1093          245 :         if (!table->col_id) goto abort_add_table;
    1094              : 
    1095          245 :         table->col_merge_stmt = (dbvm_t **)cloudsync_memory_zeroalloc((uint64_t)(sizeof(void *) * ncols));
    1096          245 :         if (!table->col_merge_stmt) goto abort_add_table;
    1097              : 
    1098          245 :         table->col_value_stmt = (dbvm_t **)cloudsync_memory_zeroalloc((uint64_t)(sizeof(void *) * ncols));
    1099          245 :         if (!table->col_value_stmt) goto abort_add_table;
    1100              : 
    1101          245 :         table->col_algo = (col_algo_t *)cloudsync_memory_zeroalloc((uint64_t)(sizeof(col_algo_t) * ncols));
    1102          245 :         if (!table->col_algo) goto abort_add_table;
    1103              : 
    1104          245 :         table->col_delimiter = (char **)cloudsync_memory_zeroalloc((uint64_t)(sizeof(char *) * ncols));
    1105          245 :         if (!table->col_delimiter) goto abort_add_table;
    1106              : 
    1107              :         // Pass empty string when schema is NULL; SQL will fall back to current_schema()
    1108          245 :         const char *schema = table->schema ? table->schema : "";
    1109          490 :         char *sql = cloudsync_memory_mprintf(SQL_PRAGMA_TABLEINFO_LIST_NONPK_NAME_CID,
    1110          245 :                                               table_name, schema, table_name, schema);
    1111          245 :         if (!sql) goto abort_add_table;
    1112          245 :         rc = database_exec_callback(data, sql, table_add_to_context_cb, (void *)table);
    1113          245 :         cloudsync_memory_free(sql);
    1114          245 :         if (rc == DBRES_ABORT) goto abort_add_table;
    1115          244 :     }
    1116              :     
    1117              :     // append newly created table
    1118          283 :     data->tables[data->tables_count++] = table;
    1119          283 :     return true;
    1120              :     
    1121              : abort_add_table:
    1122            1 :     table_free(table);
    1123            1 :     return false;
    1124          289 : }
    1125              : 
    1126            0 : dbvm_t *cloudsync_colvalue_stmt (cloudsync_context *data, const char *tbl_name, bool *persistent) {
    1127            0 :     dbvm_t *vm = NULL;
    1128            0 :     *persistent = false;
    1129              : 
    1130            0 :     cloudsync_table_context *table = table_lookup(data, tbl_name);
    1131            0 :     if (table) {
    1132            0 :         char *col_name = NULL;
    1133            0 :         if (table->ncols > 0) {
    1134            0 :             col_name = table->col_name[0];
    1135              :             // retrieve col_value precompiled statement
    1136            0 :             vm = table_column_lookup(table, col_name, false, NULL);
    1137            0 :             *persistent = true;
    1138            0 :         } else {
    1139            0 :             char *sql = table_build_value_sql(table, "*");
    1140            0 :             databasevm_prepare(data, sql, (void **)&vm, 0);
    1141            0 :             cloudsync_memory_free(sql);
    1142            0 :             *persistent = false;
    1143              :         }
    1144            0 :     }
    1145              :     
    1146            0 :     return vm;
    1147              : }
    1148              : 
    1149         6875 : bool table_enabled (cloudsync_table_context *table) {
    1150         6875 :     return table->enabled;
    1151              : }
    1152              : 
    1153            6 : void table_set_enabled (cloudsync_table_context *table, bool value) {
    1154            6 :     table->enabled = value;
    1155            6 : }
    1156              : 
    1157        19846 : int table_count_cols (cloudsync_table_context *table) {
    1158        19846 :     return table->ncols;
    1159              : }
    1160              : 
    1161         6761 : int table_count_pks (cloudsync_table_context *table) {
    1162         6761 :     return table->npks;
    1163              : }
    1164              : 
    1165        32920 : const char *table_colname (cloudsync_table_context *table, int index) {
    1166        32920 :     return table->col_name[index];
    1167              : }
    1168              : 
    1169         3586 : bool table_pk_exists (cloudsync_table_context *table, const char *value, size_t len) {
    1170              :     // check if a row with the same primary key already exists
    1171              :     // if so, this means the row might have been previously deleted (sentinel)
    1172         3586 :     return (dbvm_count(table->meta_pkexists_stmt, value, len, DBTYPE_BLOB) > 0);
    1173              : }
    1174              : 
    1175            0 : char **table_pknames (cloudsync_table_context *table) {
    1176            0 :     return table->pk_name;
    1177              : }
    1178              : 
    1179           23 : void table_set_pknames (cloudsync_table_context *table, char **pknames) {
    1180           23 :     table_pknames_free(table->pk_name, table->npks);
    1181           23 :     table->pk_name = pknames;
    1182           23 : }
    1183              : 
    1184        49240 : bool table_algo_isgos (cloudsync_table_context *table) {
    1185        49240 :     return (table->algo == table_algo_crdt_gos);
    1186              : }
    1187              : 
    1188            0 : const char *table_schema (cloudsync_table_context *table) {
    1189            0 :     return table->schema;
    1190              : }
    1191              : 
    1192              : // MARK: - Merge Insert -
    1193              : 
    1194        46090 : int64_t merge_get_local_cl (cloudsync_table_context *table, const char *pk, int pklen) {
    1195        46090 :     dbvm_t *vm = table->meta_local_cl_stmt;
    1196        46090 :     int64_t result = -1;
    1197              :     
    1198        46090 :     int rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
    1199        46090 :     if (rc != DBRES_OK) goto cleanup;
    1200              :     
    1201        46090 :     rc = databasevm_bind_blob(vm, 2, (const void *)pk, pklen);
    1202        46090 :     if (rc != DBRES_OK) goto cleanup;
    1203              :     
    1204        46090 :     rc = databasevm_step(vm);
    1205        46090 :     if (rc == DBRES_ROW) result = database_column_int(vm, 0);
    1206            0 :     else if (rc == DBRES_DONE) result = 0;
    1207              :     
    1208              : cleanup:
    1209        46090 :     if (result == -1) cloudsync_set_dberror(table->context);
    1210        46090 :     dbvm_reset(vm);
    1211        46090 :     return result;
    1212              : }
    1213              : 
    1214        45084 : int merge_get_col_version (cloudsync_table_context *table, const char *col_name, const char *pk, int pklen, int64_t *version) {
    1215        45084 :     dbvm_t *vm = table->meta_col_version_stmt;
    1216              :     
    1217        45084 :     int rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
    1218        45084 :     if (rc != DBRES_OK) goto cleanup;
    1219              :     
    1220        45084 :     rc = databasevm_bind_text(vm, 2, col_name, -1);
    1221        45084 :     if (rc != DBRES_OK) goto cleanup;
    1222              :     
    1223        45084 :     rc = databasevm_step(vm);
    1224        70045 :     if (rc == DBRES_ROW) {
    1225        24961 :         *version = database_column_int(vm, 0);
    1226        24961 :         rc = DBRES_OK;
    1227        24961 :     }
    1228              :     
    1229              : cleanup:
    1230        45084 :     if ((rc != DBRES_OK) && (rc != DBRES_DONE)) cloudsync_set_dberror(table->context);
    1231        45084 :     dbvm_reset(vm);
    1232        45084 :     return rc;
    1233              : }
    1234              : 
    1235        25133 : int merge_set_winner_clock (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pk_len, const char *colname, int64_t col_version, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
    1236              :     
    1237              :     // get/set site_id
    1238        25133 :     dbvm_t *vm = data->getset_siteid_stmt;
    1239        25133 :     int rc = databasevm_bind_blob(vm, 1, (const void *)site_id, site_len);
    1240        25133 :     if (rc != DBRES_OK) goto cleanup_merge;
    1241              :     
    1242        25133 :     rc = databasevm_step(vm);
    1243        25133 :     if (rc != DBRES_ROW) goto cleanup_merge;
    1244              :     
    1245        25133 :     int64_t ord = database_column_int(vm, 0);
    1246        25133 :     dbvm_reset(vm);
    1247              :     
    1248        25133 :     vm = table->meta_winner_clock_stmt;
    1249        25133 :     rc = databasevm_bind_blob(vm, 1, (const void *)pk, pk_len);
    1250        25133 :     if (rc != DBRES_OK) goto cleanup_merge;
    1251              :     
    1252        25133 :     rc = databasevm_bind_text(vm, 2, (colname) ? colname : CLOUDSYNC_TOMBSTONE_VALUE, -1);
    1253        25133 :     if (rc != DBRES_OK) goto cleanup_merge;
    1254              :     
    1255        25133 :     rc = databasevm_bind_int(vm, 3, col_version);
    1256        25133 :     if (rc != DBRES_OK) goto cleanup_merge;
    1257              :     
    1258        25133 :     rc = databasevm_bind_int(vm, 4, db_version);
    1259        25133 :     if (rc != DBRES_OK) goto cleanup_merge;
    1260              :     
    1261        25133 :     rc = databasevm_bind_int(vm, 5, seq);
    1262        25133 :     if (rc != DBRES_OK) goto cleanup_merge;
    1263              :     
    1264        25133 :     rc = databasevm_bind_int(vm, 6, ord);
    1265        25133 :     if (rc != DBRES_OK) goto cleanup_merge;
    1266              :     
    1267        25133 :     rc = databasevm_step(vm);
    1268        50266 :     if (rc == DBRES_ROW) {
    1269        25133 :         *rowid = database_column_int(vm, 0);
    1270        25133 :         rc = DBRES_OK;
    1271        25133 :     }
    1272              :     
    1273              : cleanup_merge:
    1274        25133 :     if (rc != DBRES_OK) cloudsync_set_dberror(data);
    1275        25133 :     dbvm_reset(vm);
    1276        25133 :     return rc;
    1277              : }
    1278              : 
    1279              : // MARK: - Deferred column-batch merge functions -
    1280              : 
    1281        21342 : static int merge_pending_add (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, const char *col_name, dbvalue_t *col_value, int64_t col_version, int64_t db_version, const char *site_id, int site_len, int64_t seq) {
    1282        21342 :     merge_pending_batch *batch = data->pending_batch;
    1283              : 
    1284              :     // Store table and PK on first entry
    1285        21342 :     if (batch->table == NULL) {
    1286         7702 :         batch->table = table;
    1287         7702 :         batch->pk = (char *)cloudsync_memory_alloc(pklen);
    1288         7702 :         if (!batch->pk) return cloudsync_set_error(data, "merge_pending_add: out of memory for pk", DBRES_NOMEM);
    1289         7702 :         memcpy(batch->pk, pk, pklen);
    1290         7702 :         batch->pk_len = pklen;
    1291         7702 :     }
    1292              : 
    1293              :     // Ensure capacity
    1294        21342 :     if (batch->count >= batch->capacity) {
    1295          502 :         int new_cap = batch->capacity ? batch->capacity * 2 : 8;
    1296          502 :         merge_pending_entry *new_entries = (merge_pending_entry *)cloudsync_memory_realloc(batch->entries, new_cap * sizeof(merge_pending_entry));
    1297          502 :         if (!new_entries) return cloudsync_set_error(data, "merge_pending_add: out of memory for entries", DBRES_NOMEM);
    1298          502 :         batch->entries = new_entries;
    1299          502 :         batch->capacity = new_cap;
    1300          502 :     }
    1301              : 
    1302              :     // Resolve col_name to a stable pointer from the table context
    1303              :     // (the incoming col_name may point to VM-owned memory that gets freed on reset)
    1304        21342 :     int col_idx = -1;
    1305        21342 :     table_column_lookup(table, col_name, true, &col_idx);
    1306        21342 :     const char *stable_col_name = (col_idx >= 0) ? table_colname(table, col_idx) : NULL;
    1307        21342 :     if (!stable_col_name) return cloudsync_set_error(data, "merge_pending_add: column not found in table context", DBRES_ERROR);
    1308              : 
    1309        21342 :     merge_pending_entry *e = &batch->entries[batch->count];
    1310        21342 :     e->col_name = stable_col_name;
    1311        21342 :     e->col_value = col_value ? (dbvalue_t *)database_value_dup(col_value) : NULL;
    1312        21342 :     e->col_version = col_version;
    1313        21342 :     e->db_version = db_version;
    1314        21342 :     e->site_id_len = (site_len <= (int)sizeof(e->site_id)) ? site_len : (int)sizeof(e->site_id);
    1315        21342 :     memcpy(e->site_id, site_id, e->site_id_len);
    1316        21342 :     e->seq = seq;
    1317              : 
    1318        21342 :     batch->count++;
    1319        21342 :     return DBRES_OK;
    1320        21342 : }
    1321              : 
    1322        18908 : static void merge_pending_free_entries (merge_pending_batch *batch) {
    1323        18908 :     if (batch->entries) {
    1324        35582 :         for (int i = 0; i < batch->count; i++) {
    1325        21342 :             if (batch->entries[i].col_value) {
    1326        21342 :                 database_value_free(batch->entries[i].col_value);
    1327        21342 :                 batch->entries[i].col_value = NULL;
    1328        21342 :             }
    1329        21342 :         }
    1330        14240 :     }
    1331        18908 :     if (batch->pk) {
    1332         7734 :         cloudsync_memory_free(batch->pk);
    1333         7734 :         batch->pk = NULL;
    1334         7734 :     }
    1335        18908 :     batch->table = NULL;
    1336        18908 :     batch->pk_len = 0;
    1337        18908 :     batch->cl = 0;
    1338        18908 :     batch->sentinel_pending = false;
    1339        18908 :     batch->row_exists = false;
    1340        18908 :     batch->count = 0;
    1341        18908 : }
    1342              : 
    1343        18908 : static int merge_flush_pending (cloudsync_context *data) {
    1344        18908 :     merge_pending_batch *batch = data->pending_batch;
    1345        18908 :     if (!batch) return DBRES_OK;
    1346              : 
    1347        18908 :     int rc = DBRES_OK;
    1348        18908 :     bool flush_savepoint = false;
    1349              : 
    1350              :     // Nothing to write — handle sentinel-only case or skip
    1351        18908 :     if (batch->count == 0 && !(batch->sentinel_pending && batch->table)) {
    1352        11174 :         goto cleanup;
    1353              :     }
    1354              : 
    1355              :     // Wrap database operations in a savepoint so that on failure (e.g. RLS
    1356              :     // denial) the rollback properly releases all executor resources (open
    1357              :     // relations, snapshots, plan cache) acquired during the failed statement.
    1358         7734 :     flush_savepoint = (database_begin_savepoint(data, "merge_flush") == DBRES_OK);
    1359              : 
    1360         7734 :     if (batch->count == 0) {
    1361              :         // Sentinel with no winning columns (PK-only row)
    1362           30 :         dbvm_t *vm = batch->table->real_merge_sentinel_stmt;
    1363           30 :         rc = pk_decode_prikey(batch->pk, (size_t)batch->pk_len, pk_decode_bind_callback, vm);
    1364           30 :         if (rc < 0) {
    1365            0 :             cloudsync_set_dberror(data);
    1366            0 :             dbvm_reset(vm);
    1367            0 :             goto cleanup;
    1368              :         }
    1369           30 :         SYNCBIT_SET(data);
    1370           30 :         rc = databasevm_step(vm);
    1371           30 :         dbvm_reset(vm);
    1372           30 :         SYNCBIT_RESET(data);
    1373           30 :         if (rc == DBRES_DONE) rc = DBRES_OK;
    1374           30 :         if (rc != DBRES_OK) {
    1375            0 :             cloudsync_set_dberror(data);
    1376            0 :             goto cleanup;
    1377              :         }
    1378           30 :         goto cleanup;
    1379              :     }
    1380              : 
    1381              :     // Check if cached prepared statement can be reused
    1382         7704 :     cloudsync_table_context *table = batch->table;
    1383         7704 :     dbvm_t *vm = NULL;
    1384         7704 :     bool cache_hit = false;
    1385              : 
    1386        14601 :     if (batch->cached_vm &&
    1387         7202 :         batch->cached_row_exists == batch->row_exists &&
    1388         6897 :         batch->cached_col_count == batch->count) {
    1389         6870 :         cache_hit = true;
    1390        26559 :         for (int i = 0; i < batch->count; i++) {
    1391        19715 :             if (batch->cached_col_names[i] != batch->entries[i].col_name) {
    1392           26 :                 cache_hit = false;
    1393           26 :                 break;
    1394              :             }
    1395        19689 :         }
    1396         6870 :     }
    1397              : 
    1398         7704 :     if (cache_hit) {
    1399         6844 :         vm = batch->cached_vm;
    1400         6844 :         dbvm_reset(vm);
    1401         6844 :     } else {
    1402              :         // Invalidate old cache
    1403          860 :         if (batch->cached_vm) {
    1404          358 :             databasevm_finalize(batch->cached_vm);
    1405          358 :             batch->cached_vm = NULL;
    1406          358 :         }
    1407              : 
    1408              :         // Build multi-column SQL
    1409          860 :         const char **colnames = (const char **)cloudsync_memory_alloc(batch->count * sizeof(const char *));
    1410          860 :         if (!colnames) {
    1411            0 :             rc = cloudsync_set_error(data, "merge_flush_pending: out of memory", DBRES_NOMEM);
    1412            0 :             goto cleanup;
    1413              :         }
    1414         2513 :         for (int i = 0; i < batch->count; i++) {
    1415         1653 :             colnames[i] = batch->entries[i].col_name;
    1416         1653 :         }
    1417              : 
    1418          860 :         char *sql = batch->row_exists
    1419          421 :             ? sql_build_update_pk_and_multi_cols(data, table->name, colnames, batch->count, table->schema)
    1420          439 :             : sql_build_upsert_pk_and_multi_cols(data, table->name, colnames, batch->count, table->schema);
    1421          860 :         cloudsync_memory_free(colnames);
    1422              : 
    1423          860 :         if (!sql) {
    1424            0 :             rc = cloudsync_set_error(data, "merge_flush_pending: unable to build multi-column upsert SQL", DBRES_ERROR);
    1425            0 :             goto cleanup;
    1426              :         }
    1427              : 
    1428          860 :         rc = databasevm_prepare(data, sql, &vm, 0);
    1429          860 :         cloudsync_memory_free(sql);
    1430          860 :         if (rc != DBRES_OK) {
    1431            0 :             rc = cloudsync_set_error(data, "merge_flush_pending: unable to prepare statement", rc);
    1432            0 :             goto cleanup;
    1433              :         }
    1434              : 
    1435              :         // Update cache
    1436          860 :         batch->cached_vm = vm;
    1437          860 :         batch->cached_row_exists = batch->row_exists;
    1438          860 :         batch->cached_col_count = batch->count;
    1439              :         // Reallocate cached_col_names if needed
    1440          860 :         if (batch->cached_col_count > 0) {
    1441          860 :             const char **new_names = (const char **)cloudsync_memory_realloc(
    1442          860 :                 batch->cached_col_names, batch->count * sizeof(const char *));
    1443          860 :             if (new_names) {
    1444         2513 :                 for (int i = 0; i < batch->count; i++) {
    1445         1653 :                     new_names[i] = batch->entries[i].col_name;
    1446         1653 :                 }
    1447          860 :                 batch->cached_col_names = new_names;
    1448          860 :             }
    1449          860 :         }
    1450              :     }
    1451              : 
    1452              :     // Bind PKs (positions 1..npks)
    1453         7704 :     int npks = pk_decode_prikey(batch->pk, (size_t)batch->pk_len, pk_decode_bind_callback, vm);
    1454         7704 :     if (npks < 0) {
    1455            0 :         cloudsync_set_dberror(data);
    1456            0 :         dbvm_reset(vm);
    1457            0 :         rc = DBRES_ERROR;
    1458            0 :         goto cleanup;
    1459              :     }
    1460              : 
    1461              :     // Bind column values (positions npks+1..npks+count)
    1462        29046 :     for (int i = 0; i < batch->count; i++) {
    1463        21342 :         merge_pending_entry *e = &batch->entries[i];
    1464        21342 :         int bind_idx = npks + 1 + i;
    1465        21342 :         if (e->col_value) {
    1466        21342 :             rc = databasevm_bind_value(vm, bind_idx, e->col_value);
    1467        21342 :         } else {
    1468            0 :             rc = databasevm_bind_null(vm, bind_idx);
    1469              :         }
    1470        21342 :         if (rc != DBRES_OK) {
    1471            0 :             cloudsync_set_dberror(data);
    1472            0 :             dbvm_reset(vm);
    1473            0 :             goto cleanup;
    1474              :         }
    1475        21342 :     }
    1476              : 
    1477              :     // Execute with SYNCBIT and GOS handling
    1478         7704 :     if (table->algo == table_algo_crdt_gos) table->enabled = 0;
    1479         7704 :     SYNCBIT_SET(data);
    1480         7704 :     rc = databasevm_step(vm);
    1481         7704 :     dbvm_reset(vm);
    1482         7704 :     SYNCBIT_RESET(data);
    1483         7704 :     if (table->algo == table_algo_crdt_gos) table->enabled = 1;
    1484              : 
    1485         7704 :     if (rc != DBRES_DONE) {
    1486            3 :         cloudsync_set_dberror(data);
    1487            3 :         goto cleanup;
    1488              :     }
    1489         7701 :     rc = DBRES_OK;
    1490              : 
    1491              :     // Call merge_set_winner_clock for each buffered entry
    1492         7701 :     int64_t rowid = 0;
    1493        29035 :     for (int i = 0; i < batch->count; i++) {
    1494        21334 :         merge_pending_entry *e = &batch->entries[i];
    1495        42668 :         int clock_rc = merge_set_winner_clock(data, table, batch->pk, batch->pk_len,
    1496        21334 :                                                e->col_name, e->col_version, e->db_version,
    1497        21334 :                                                (const char *)e->site_id, e->site_id_len,
    1498        21334 :                                                e->seq, &rowid);
    1499        21334 :         if (clock_rc != DBRES_OK) {
    1500            0 :             rc = clock_rc;
    1501            0 :             goto cleanup;
    1502              :         }
    1503        29035 :     }
    1504              : 
    1505              : cleanup:
    1506        18908 :     merge_pending_free_entries(batch);
    1507        18908 :     if (flush_savepoint) {
    1508         7734 :         if (rc == DBRES_OK) database_commit_savepoint(data, "merge_flush");
    1509            3 :         else database_rollback_savepoint(data, "merge_flush");
    1510         7734 :     }
    1511        18908 :     return rc;
    1512        18908 : }
    1513              : 
    1514         3167 : int merge_insert_col (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *col_name, dbvalue_t *col_value, int64_t col_version, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
    1515              :     int index;
    1516         3167 :     dbvm_t *vm = table_column_lookup(table, col_name, true, &index);
    1517         3167 :     if (vm == NULL) return cloudsync_set_error(data, "Unable to retrieve column merge precompiled statement in merge_insert_col", DBRES_MISUSE);
    1518              :     
    1519              :     // INSERT INTO table (pk1, pk2, col_name) VALUES (?, ?, ?) ON CONFLICT DO UPDATE SET col_name=?;"
    1520              :     
    1521              :     // bind primary key(s)
    1522         3167 :     int rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
    1523         3167 :     if (rc < 0) {
    1524            0 :         cloudsync_set_dberror(data);
    1525            0 :         dbvm_reset(vm);
    1526            0 :         return rc;
    1527              :     }
    1528              :     
    1529              :     // bind value (always bind all expected parameters for correct prepared statement handling)
    1530         3167 :     if (col_value) {
    1531         3167 :         rc = databasevm_bind_value(vm, table->npks+1, col_value);
    1532         3167 :         if (rc == DBRES_OK) rc = databasevm_bind_value(vm, table->npks+2, col_value);
    1533         3167 :     } else {
    1534            0 :         rc = databasevm_bind_null(vm, table->npks+1);
    1535            0 :         if (rc == DBRES_OK) rc = databasevm_bind_null(vm, table->npks+2);
    1536              :     }
    1537         3167 :     if (rc != DBRES_OK) {
    1538            0 :         cloudsync_set_dberror(data);
    1539            0 :         dbvm_reset(vm);
    1540            0 :         return rc;
    1541              :     }
    1542              : 
    1543              :     // perform real operation and disable triggers
    1544              :     
    1545              :     // in case of GOS we reused the table->col_merge_stmt statement
    1546              :     // which looks like: INSERT INTO table (pk1, pk2, col_name) VALUES (?, ?, ?) ON CONFLICT DO UPDATE SET col_name=?;"
    1547              :     // but the UPDATE in the CONFLICT statement would return SQLITE_CONSTRAINT because the trigger raises the error
    1548              :     // the trick is to disable that trigger before executing the statement
    1549         3167 :     if (table->algo == table_algo_crdt_gos) table->enabled = 0;
    1550         3167 :     SYNCBIT_SET(data);
    1551         3167 :     rc = databasevm_step(vm);
    1552              :     DEBUG_MERGE("merge_insert(%02x%02x): %s (%d)", data->site_id[UUID_LEN-2], data->site_id[UUID_LEN-1], databasevm_sql(vm), rc);
    1553         3167 :     dbvm_reset(vm);
    1554         3167 :     SYNCBIT_RESET(data);
    1555         3167 :     if (table->algo == table_algo_crdt_gos) table->enabled = 1;
    1556              :     
    1557         3167 :     if (rc != DBRES_DONE) {
    1558            0 :         cloudsync_set_dberror(data);
    1559            0 :         return rc;
    1560              :     }
    1561              :     
    1562         3167 :     return merge_set_winner_clock(data, table, pk, pklen, col_name, col_version, db_version, site_id, site_len, seq, rowid);
    1563         3167 : }
    1564              : 
    1565          162 : int merge_delete (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, const char *colname, int64_t cl, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
    1566          162 :     int rc = DBRES_OK;
    1567              :     
    1568              :     // reset return value
    1569          162 :     *rowid = 0;
    1570              :     
    1571              :     // bind pk
    1572          162 :     dbvm_t *vm = table->real_merge_delete_stmt;
    1573          162 :     rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
    1574          162 :     if (rc < 0) {
    1575            0 :         rc = cloudsync_set_dberror(data);
    1576            0 :         dbvm_reset(vm);
    1577            0 :         return rc;
    1578              :     }
    1579              :     
    1580              :     // perform real operation and disable triggers
    1581          162 :     SYNCBIT_SET(data);
    1582          162 :     rc = databasevm_step(vm);
    1583              :     DEBUG_MERGE("merge_delete(%02x%02x): %s (%d)", data->site_id[UUID_LEN-2], data->site_id[UUID_LEN-1], databasevm_sql(vm), rc);
    1584          162 :     dbvm_reset(vm);
    1585          162 :     SYNCBIT_RESET(data);
    1586          162 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    1587          162 :     if (rc != DBRES_OK) {
    1588            0 :         cloudsync_set_dberror(data);
    1589            0 :         return rc;
    1590              :     }
    1591              :     
    1592          162 :     rc = merge_set_winner_clock(data, table, pk, pklen, colname, cl, db_version, site_id, site_len, seq, rowid);
    1593          162 :     if (rc != DBRES_OK) return rc;
    1594              :     
    1595              :     // drop clocks _after_ setting the winner clock so we don't lose track of the max db_version!!
    1596              :     // this must never come before `set_winner_clock`
    1597          162 :     vm = table->meta_merge_delete_drop;
    1598          162 :     rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
    1599          162 :     if (rc == DBRES_OK) rc = databasevm_step(vm);
    1600          162 :     dbvm_reset(vm);
    1601              :     
    1602          162 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    1603          162 :     if (rc != DBRES_OK) cloudsync_set_dberror(data);
    1604          162 :     return rc;
    1605          162 : }
    1606              : 
    1607           58 : int merge_zeroclock_on_resurrect(cloudsync_table_context *table, int64_t db_version, const char *pk, int pklen) {
    1608           58 :     dbvm_t *vm = table->meta_zero_clock_stmt;
    1609              :     
    1610           58 :     int rc = databasevm_bind_int(vm, 1, db_version);
    1611           58 :     if (rc != DBRES_OK) goto cleanup;
    1612              :     
    1613           58 :     rc = databasevm_bind_blob(vm, 2, (const void *)pk, pklen);
    1614           58 :     if (rc != DBRES_OK) goto cleanup;
    1615              :     
    1616           58 :     rc = databasevm_step(vm);
    1617           58 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    1618              :     
    1619              : cleanup:
    1620           58 :     if (rc != DBRES_OK) cloudsync_set_dberror(table->context);
    1621           58 :     dbvm_reset(vm);
    1622           58 :     return rc;
    1623              : }
    1624              : 
    1625              : // executed only if insert_cl == local_cl
    1626        45084 : int merge_did_cid_win (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, dbvalue_t *insert_value, const char *site_id, int site_len, const char *col_name, int64_t col_version, bool *didwin_flag) {
    1627              :     
    1628        45084 :     if (col_name == NULL) col_name = CLOUDSYNC_TOMBSTONE_VALUE;
    1629              :     
    1630              :     int64_t local_version;
    1631        45084 :     int rc = merge_get_col_version(table, col_name, pk, pklen, &local_version);
    1632        45084 :     if (rc == DBRES_DONE) {
    1633              :         // no rows returned, the incoming change wins if there's nothing there locally
    1634        20123 :         *didwin_flag = true;
    1635        20123 :         return DBRES_OK;
    1636              :     }
    1637        24961 :     if (rc != DBRES_OK) return rc;
    1638              :     
    1639              :     // rc == DBRES_OK, means that a row with a version exists
    1640        24961 :     if (local_version != col_version) {
    1641         1986 :         if (col_version > local_version) {*didwin_flag = true; return DBRES_OK;}
    1642          743 :         if (col_version < local_version) {*didwin_flag = false; return DBRES_OK;}
    1643            0 :     }
    1644              :     
    1645              :     // rc == DBRES_ROW and col_version == local_version, need to compare values
    1646              : 
    1647              :     // retrieve col_value precompiled statement
    1648        22975 :     bool is_block_col = block_is_block_colname(col_name) && table_has_block_cols(table);
    1649              :     dbvm_t *vm;
    1650        22975 :     if (is_block_col) {
    1651              :         // Block column: read value from blocks table (pk + col_name bindings)
    1652           43 :         vm = table_block_value_read_stmt(table);
    1653           43 :         if (!vm) return cloudsync_set_error(data, "Unable to retrieve block value read statement in merge_did_cid_win", DBRES_ERROR);
    1654           43 :         rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
    1655           43 :         if (rc != DBRES_OK) { dbvm_reset(vm); return cloudsync_set_dberror(data); }
    1656           43 :         rc = databasevm_bind_text(vm, 2, col_name, -1);
    1657           43 :         if (rc != DBRES_OK) { dbvm_reset(vm); return cloudsync_set_dberror(data); }
    1658           43 :     } else {
    1659        22932 :         vm = table_column_lookup(table, col_name, false, NULL);
    1660        22932 :         if (!vm) return cloudsync_set_error(data, "Unable to retrieve column value precompiled statement in merge_did_cid_win", DBRES_ERROR);
    1661              : 
    1662              :         // bind primary key values
    1663        22932 :         rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, (void *)vm);
    1664        22932 :         if (rc < 0) {
    1665            0 :             rc = cloudsync_set_dberror(data);
    1666            0 :             dbvm_reset(vm);
    1667            0 :             return rc;
    1668              :         }
    1669              :     }
    1670              :         
    1671              :     // execute vm
    1672              :     dbvalue_t *local_value;
    1673        22975 :     rc = databasevm_step(vm);
    1674        22975 :     if (rc == DBRES_DONE) {
    1675              :         // meta entry exists but the actual value is missing
    1676              :         // we should allow the value_compare function to make a decision
    1677              :         // value_compare has been modified to handle the case where lvalue is NULL
    1678            2 :         local_value = NULL;
    1679            2 :         rc = DBRES_OK;
    1680        22975 :     } else if (rc == DBRES_ROW) {
    1681        22973 :         local_value = database_column_value(vm, 0);
    1682        22973 :         rc = DBRES_OK;
    1683        22973 :     } else {
    1684            0 :         goto cleanup;
    1685              :     }
    1686              :     
    1687              :     // compare values
    1688        22975 :     int ret = dbutils_value_compare(insert_value, local_value);
    1689              :     // reset after compare, otherwise local value would be deallocated
    1690        22975 :     dbvm_reset(vm);
    1691        22975 :     vm = NULL;
    1692              :     
    1693        22975 :     bool compare_site_id = (ret == 0 && data->merge_equal_values == true);
    1694        22975 :     if (!compare_site_id) {
    1695        22975 :         *didwin_flag = (ret > 0);
    1696        22975 :         goto cleanup;
    1697              :     }
    1698              :     
    1699              :     // values are the same and merge_equal_values is true
    1700            0 :     vm = table->meta_site_id_stmt;
    1701            0 :     rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
    1702            0 :     if (rc != DBRES_OK) goto cleanup;
    1703              :     
    1704            0 :     rc = databasevm_bind_text(vm, 2, col_name, -1);
    1705            0 :     if (rc != DBRES_OK) goto cleanup;
    1706              :     
    1707            0 :     rc = databasevm_step(vm);
    1708            0 :     if (rc == DBRES_ROW) {
    1709            0 :         const void *local_site_id = database_column_blob(vm, 0, NULL);
    1710            0 :         if (!local_site_id) {
    1711            0 :             dbvm_reset(vm);
    1712            0 :             return cloudsync_set_error(data, "NULL site_id in cloudsync table, table is probably corrupted", DBRES_ERROR);
    1713              :         }
    1714            0 :         ret = memcmp(site_id, local_site_id, site_len);
    1715            0 :         *didwin_flag = (ret > 0);
    1716            0 :         dbvm_reset(vm);
    1717            0 :         return DBRES_OK;
    1718              :     }
    1719              :     
    1720              :     // handle error condition here
    1721            0 :     dbvm_reset(vm);
    1722            0 :     return cloudsync_set_error(data, "Unable to find site_id for previous change, cloudsync table is probably corrupted", DBRES_ERROR);
    1723              :     
    1724              : cleanup:
    1725        22975 :     if (rc != DBRES_OK) cloudsync_set_dberror(data);
    1726        22975 :     dbvm_reset(vm);
    1727        22975 :     return rc;
    1728        45084 : }
    1729              : 
    1730           58 : int merge_sentinel_only_insert (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, int64_t cl, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
    1731              : 
    1732              :     // reset return value
    1733           58 :     *rowid = 0;
    1734              : 
    1735           58 :     if (data->pending_batch == NULL) {
    1736              :         // Immediate mode: execute base table INSERT
    1737            0 :         dbvm_t *vm = table->real_merge_sentinel_stmt;
    1738            0 :         int rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
    1739            0 :         if (rc < 0) {
    1740            0 :             rc = cloudsync_set_dberror(data);
    1741            0 :             dbvm_reset(vm);
    1742            0 :             return rc;
    1743              :         }
    1744              : 
    1745            0 :         SYNCBIT_SET(data);
    1746            0 :         rc = databasevm_step(vm);
    1747            0 :         dbvm_reset(vm);
    1748            0 :         SYNCBIT_RESET(data);
    1749            0 :         if (rc == DBRES_DONE) rc = DBRES_OK;
    1750            0 :         if (rc != DBRES_OK) {
    1751            0 :             cloudsync_set_dberror(data);
    1752            0 :             return rc;
    1753              :         }
    1754            0 :     } else {
    1755              :         // Batch mode: skip base table INSERT, the batch flush will create the row
    1756           58 :         merge_pending_batch *batch = data->pending_batch;
    1757           58 :         batch->sentinel_pending = true;
    1758           58 :         if (batch->table == NULL) {
    1759           32 :             batch->table = table;
    1760           32 :             batch->pk = (char *)cloudsync_memory_alloc(pklen);
    1761           32 :             if (!batch->pk) return cloudsync_set_error(data, "merge_sentinel_only_insert: out of memory for pk", DBRES_NOMEM);
    1762           32 :             memcpy(batch->pk, pk, pklen);
    1763           32 :             batch->pk_len = pklen;
    1764           32 :         }
    1765              :     }
    1766              : 
    1767              :     // Metadata operations always execute regardless of batch mode
    1768           58 :     int rc = merge_zeroclock_on_resurrect(table, db_version, pk, pklen);
    1769           58 :     if (rc != DBRES_OK) return rc;
    1770              : 
    1771           58 :     return merge_set_winner_clock(data, table, pk, pklen, NULL, cl, db_version, site_id, site_len, seq, rowid);
    1772           58 : }
    1773              : 
    1774              : // MARK: - Block-level merge helpers -
    1775              : 
    1776              : // Store a block value in the blocks table
    1777          379 : static int block_store_value (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *block_colname, dbvalue_t *col_value) {
    1778          379 :     dbvm_t *vm = table->block_value_write_stmt;
    1779          379 :     if (!vm) return cloudsync_set_error(data, "block_store_value: blocks table not initialized", DBRES_MISUSE);
    1780              : 
    1781          379 :     int rc = databasevm_bind_blob(vm, 1, pk, pklen);
    1782          379 :     if (rc != DBRES_OK) goto cleanup;
    1783          379 :     rc = databasevm_bind_text(vm, 2, block_colname, -1);
    1784          379 :     if (rc != DBRES_OK) goto cleanup;
    1785          379 :     if (col_value) {
    1786          379 :         rc = databasevm_bind_value(vm, 3, col_value);
    1787          379 :     } else {
    1788            0 :         rc = databasevm_bind_null(vm, 3);
    1789              :     }
    1790          379 :     if (rc != DBRES_OK) goto cleanup;
    1791              : 
    1792          379 :     rc = databasevm_step(vm);
    1793          379 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    1794              : 
    1795              : cleanup:
    1796          379 :     if (rc != DBRES_OK) cloudsync_set_dberror(data);
    1797          379 :     databasevm_reset(vm);
    1798          379 :     return rc;
    1799          379 : }
    1800              : 
    1801              : // Delete a block value from the blocks table
    1802           74 : static int block_delete_value (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *block_colname) {
    1803           74 :     dbvm_t *vm = table->block_value_delete_stmt;
    1804           74 :     if (!vm) return cloudsync_set_error(data, "block_delete_value: blocks table not initialized", DBRES_MISUSE);
    1805              : 
    1806           74 :     int rc = databasevm_bind_blob(vm, 1, pk, pklen);
    1807           74 :     if (rc != DBRES_OK) goto cleanup;
    1808           74 :     rc = databasevm_bind_text(vm, 2, block_colname, -1);
    1809           74 :     if (rc != DBRES_OK) goto cleanup;
    1810              : 
    1811           74 :     rc = databasevm_step(vm);
    1812           74 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    1813              : 
    1814              : cleanup:
    1815           74 :     if (rc != DBRES_OK) cloudsync_set_dberror(data);
    1816           74 :     databasevm_reset(vm);
    1817           74 :     return rc;
    1818           74 : }
    1819              : 
    1820              : // Materialize all alive blocks for a base column into the base table
    1821          462 : int block_materialize_column (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *base_col_name) {
    1822          462 :     if (!table->block_list_stmt) return cloudsync_set_error(data, "block_materialize_column: blocks table not initialized", DBRES_MISUSE);
    1823              : 
    1824              :     // Find column index and delimiter
    1825          462 :     int col_idx = -1;
    1826          468 :     for (int i = 0; i < table->ncols; i++) {
    1827          468 :         if (strcasecmp(table->col_name[i], base_col_name) == 0) {
    1828          462 :             col_idx = i;
    1829          462 :             break;
    1830              :         }
    1831            6 :     }
    1832          462 :     if (col_idx < 0) return cloudsync_set_error(data, "block_materialize_column: column not found", DBRES_ERROR);
    1833          462 :     const char *delimiter = table->col_delimiter[col_idx] ? table->col_delimiter[col_idx] : BLOCK_DEFAULT_DELIMITER;
    1834              : 
    1835              :     // Build the LIKE pattern for block col_names: "base_col\x1F%"
    1836          462 :     char *like_pattern = block_build_colname(base_col_name, "%");
    1837          462 :     if (!like_pattern) return DBRES_NOMEM;
    1838              : 
    1839              :     // Query alive blocks from blocks table joined with metadata
    1840              :     // block_list_stmt: SELECT b.col_value FROM blocks b JOIN meta m
    1841              :     //   ON b.pk = m.pk AND b.col_name = m.col_name
    1842              :     //   WHERE b.pk = ? AND b.col_name LIKE ? AND m.col_version % 2 = 1
    1843              :     //   ORDER BY b.col_name
    1844          462 :     dbvm_t *vm = table->block_list_stmt;
    1845          462 :     int rc = databasevm_bind_blob(vm, 1, pk, pklen);
    1846          462 :     if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
    1847          462 :     rc = databasevm_bind_text(vm, 2, like_pattern, -1);
    1848          462 :     if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
    1849              :     // Bind pk again for the join condition (parameter 3)
    1850          462 :     rc = databasevm_bind_blob(vm, 3, pk, pklen);
    1851          462 :     if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
    1852          462 :     rc = databasevm_bind_text(vm, 4, like_pattern, -1);
    1853          462 :     if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
    1854              : 
    1855              :     // Collect block values
    1856          462 :     const char **block_values = NULL;
    1857          462 :     int block_count = 0;
    1858          462 :     int block_cap = 0;
    1859              : 
    1860        22695 :     while ((rc = databasevm_step(vm)) == DBRES_ROW) {
    1861        22233 :         const char *value = database_column_text(vm, 0);
    1862        22233 :         if (block_count >= block_cap) {
    1863         1076 :             int new_cap = block_cap ? block_cap * 2 : 16;
    1864         1076 :             const char **new_arr = (const char **)cloudsync_memory_realloc((void *)block_values, (uint64_t)(new_cap * sizeof(char *)));
    1865         1076 :             if (!new_arr) { rc = DBRES_NOMEM; break; }
    1866         1076 :             block_values = new_arr;
    1867         1076 :             block_cap = new_cap;
    1868         1076 :         }
    1869        22233 :         block_values[block_count] = value ? cloudsync_string_dup(value) : cloudsync_string_dup("");
    1870        22233 :         block_count++;
    1871              :     }
    1872          462 :     databasevm_reset(vm);
    1873          462 :     cloudsync_memory_free(like_pattern);
    1874              : 
    1875          462 :     if (rc != DBRES_DONE && rc != DBRES_OK && rc != DBRES_ROW) {
    1876              :         // Free collected values
    1877            0 :         for (int i = 0; i < block_count; i++) cloudsync_memory_free((void *)block_values[i]);
    1878            0 :         if (block_values) cloudsync_memory_free((void *)block_values);
    1879            0 :         return cloudsync_set_dberror(data);
    1880              :     }
    1881              : 
    1882              :     // Materialize text (NULL when no alive blocks)
    1883          462 :     char *text = (block_count > 0) ? block_materialize_text(block_values, block_count, delimiter) : NULL;
    1884        22695 :     for (int i = 0; i < block_count; i++) cloudsync_memory_free((void *)block_values[i]);
    1885          462 :     if (block_values) cloudsync_memory_free((void *)block_values);
    1886          462 :     if (block_count > 0 && !text) return DBRES_NOMEM;
    1887              : 
    1888              :     // Update the base table column via the col_merge_stmt (with triggers disabled)
    1889          462 :     dbvm_t *merge_vm = table->col_merge_stmt[col_idx];
    1890          462 :     if (!merge_vm) { cloudsync_memory_free(text); return DBRES_ERROR; }
    1891              : 
    1892              :     // Bind PKs
    1893          462 :     rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, merge_vm);
    1894          462 :     if (rc < 0) { cloudsync_memory_free(text); databasevm_reset(merge_vm); return DBRES_ERROR; }
    1895              : 
    1896              :     // Bind the text value twice (INSERT value + ON CONFLICT UPDATE value)
    1897          462 :     int npks = table->npks;
    1898          462 :     if (text) {
    1899          458 :         rc = databasevm_bind_text(merge_vm, npks + 1, text, -1);
    1900          458 :         if (rc != DBRES_OK) { cloudsync_memory_free(text); databasevm_reset(merge_vm); return rc; }
    1901          458 :         rc = databasevm_bind_text(merge_vm, npks + 2, text, -1);
    1902          458 :         if (rc != DBRES_OK) { cloudsync_memory_free(text); databasevm_reset(merge_vm); return rc; }
    1903          458 :     } else {
    1904            4 :         rc = databasevm_bind_null(merge_vm, npks + 1);
    1905            4 :         if (rc != DBRES_OK) { databasevm_reset(merge_vm); return rc; }
    1906            4 :         rc = databasevm_bind_null(merge_vm, npks + 2);
    1907            4 :         if (rc != DBRES_OK) { databasevm_reset(merge_vm); return rc; }
    1908              :     }
    1909              : 
    1910              :     // Execute with triggers disabled
    1911          462 :     table->enabled = 0;
    1912          462 :     SYNCBIT_SET(data);
    1913          462 :     rc = databasevm_step(merge_vm);
    1914          462 :     databasevm_reset(merge_vm);
    1915          462 :     SYNCBIT_RESET(data);
    1916          462 :     table->enabled = 1;
    1917              : 
    1918          462 :     cloudsync_memory_free(text);
    1919              : 
    1920          462 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    1921          462 :     if (rc != DBRES_OK) return cloudsync_set_dberror(data);
    1922          462 :     return DBRES_OK;
    1923          462 : }
    1924              : 
    1925              : // Accessor for has_block_cols flag
    1926         1024 : bool table_has_block_cols (cloudsync_table_context *table) {
    1927         1024 :     return table && table->has_block_cols;
    1928              : }
    1929              : 
    1930              : // Get block column algo for a given column index
    1931        11587 : col_algo_t table_col_algo (cloudsync_table_context *table, int index) {
    1932        11587 :     if (!table || !table->col_algo || index < 0 || index >= table->ncols) return col_algo_normal;
    1933        11587 :     return table->col_algo[index];
    1934        11587 : }
    1935              : 
    1936              : // Get block delimiter for a given column index
    1937          137 : const char *table_col_delimiter (cloudsync_table_context *table, int index) {
    1938          137 :     if (!table || !table->col_delimiter || index < 0 || index >= table->ncols) return BLOCK_DEFAULT_DELIMITER;
    1939          137 :     return table->col_delimiter[index] ? table->col_delimiter[index] : BLOCK_DEFAULT_DELIMITER;
    1940          137 : }
    1941              : 
    1942              : // Block column struct accessors (for use outside cloudsync.c where struct is opaque)
    1943         1024 : dbvm_t *table_block_value_read_stmt (cloudsync_table_context *table) { return table ? table->block_value_read_stmt : NULL; }
    1944          506 : dbvm_t *table_block_value_write_stmt (cloudsync_table_context *table) { return table ? table->block_value_write_stmt : NULL; }
    1945           93 : dbvm_t *table_block_list_stmt (cloudsync_table_context *table) { return table ? table->block_list_stmt : NULL; }
    1946           93 : const char *table_blocks_ref (cloudsync_table_context *table) { return table ? table->blocks_ref : NULL; }
    1947              : 
    1948            3 : void table_set_col_delimiter (cloudsync_table_context *table, int col_idx, const char *delimiter) {
    1949            3 :     if (!table || !table->col_delimiter || col_idx < 0 || col_idx >= table->ncols) return;
    1950            3 :     if (table->col_delimiter[col_idx]) cloudsync_memory_free(table->col_delimiter[col_idx]);
    1951            3 :     table->col_delimiter[col_idx] = delimiter ? cloudsync_string_dup(delimiter) : NULL;
    1952            3 : }
    1953              : 
    1954              : // Find column index by name
    1955          127 : int table_col_index (cloudsync_table_context *table, const char *col_name) {
    1956          127 :     if (!table || !col_name) return -1;
    1957          131 :     for (int i = 0; i < table->ncols; i++) {
    1958          131 :         if (strcasecmp(table->col_name[i], col_name) == 0) return i;
    1959            4 :     }
    1960            0 :     return -1;
    1961          127 : }
    1962              : 
    1963        46090 : int merge_insert (cloudsync_context *data, cloudsync_table_context *table, const char *insert_pk, int insert_pk_len, int64_t insert_cl, const char *insert_name, dbvalue_t *insert_value, int64_t insert_col_version, int64_t insert_db_version, const char *insert_site_id, int insert_site_id_len, int64_t insert_seq, int64_t *rowid) {
    1964              :     // Handle DWS and AWS algorithms here
    1965              :     // Delete-Wins Set (DWS): table_algo_crdt_dws
    1966              :     // Add-Wins Set (AWS): table_algo_crdt_aws
    1967              :     
    1968              :     // Causal-Length Set (CLS) Algorithm (default)
    1969              :     
    1970              :     // compute the local causal length for the row based on the primary key
    1971              :     // the causal length is used to determine the order of operations and resolve conflicts.
    1972        46090 :     int64_t local_cl = merge_get_local_cl(table, insert_pk, insert_pk_len);
    1973        46090 :     if (local_cl < 0) return cloudsync_set_error(data, "Unable to compute local causal length", DBRES_ERROR);
    1974              :     
    1975              :     // if the incoming causal length is older than the local causal length, we can safely ignore it
    1976              :     // because the local changes are more recent
    1977        46090 :     if (insert_cl < local_cl) return DBRES_OK;
    1978              :     
    1979              :     // check if the operation is a delete by examining the causal length
    1980              :     // even causal lengths typically signify delete operations
    1981        45880 :     bool is_delete = (insert_cl % 2 == 0);
    1982        45880 :     if (is_delete) {
    1983              :         // if it's a delete, check if the local state is at the same causal length
    1984              :         // if it is, no further action is needed
    1985          606 :         if (local_cl == insert_cl) return DBRES_OK;
    1986              :         
    1987              :         // perform a delete merge if the causal length is newer than the local one
    1988          324 :         int rc = merge_delete(data, table, insert_pk, insert_pk_len, insert_name, insert_col_version,
    1989          162 :                               insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
    1990          162 :         if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_delete", rc);
    1991          162 :         return rc;
    1992              :     }
    1993              :     
    1994              :     // if the operation is a sentinel-only insert (indicating a new row or resurrected row with no column update), handle it separately.
    1995        45274 :     bool is_sentinel_only = (strcmp(insert_name, CLOUDSYNC_TOMBSTONE_VALUE) == 0);
    1996        45274 :     if (is_sentinel_only) {
    1997          190 :         if (local_cl == insert_cl) return DBRES_OK;
    1998              :         
    1999              :         // perform a sentinel-only insert to track the existence of the row
    2000          116 :         int rc = merge_sentinel_only_insert(data, table, insert_pk, insert_pk_len, insert_col_version,
    2001           58 :                                             insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
    2002           58 :         if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_sentinel_only_insert", rc);
    2003           58 :         return rc;
    2004              :     }
    2005              :     
    2006              :     // from this point I can be sure that insert_name is not sentinel
    2007              :     
    2008              :     // handle the case where a row is being resurrected (e.g., after a delete, a new insert for the same row)
    2009              :     // odd causal lengths can "resurrect" rows
    2010        45084 :     bool needs_resurrect = (insert_cl > local_cl && insert_cl % 2 == 1);
    2011        45084 :     bool row_exists_locally = local_cl != 0;
    2012              :    
    2013              :     // if a resurrection is needed, insert a sentinel to mark the row as alive
    2014              :     // this handles out-of-order deliveries where the row was deleted and is now being re-inserted
    2015        45084 :     if (needs_resurrect && (row_exists_locally || (!row_exists_locally && insert_cl > 1))) {
    2016            0 :         int rc = merge_sentinel_only_insert(data, table, insert_pk, insert_pk_len, insert_cl,
    2017            0 :                                             insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
    2018            0 :         if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to perform merge_sentinel_only_insert", rc);
    2019            0 :     }
    2020              :     
    2021              :     // at this point, we determine whether the incoming change wins based on causal length
    2022              :     // this can be due to a resurrection, a non-existent local row, or a conflict resolution
    2023        45084 :     bool flag = false;
    2024        45084 :     int rc = merge_did_cid_win(data, table, insert_pk, insert_pk_len, insert_value, insert_site_id, insert_site_id_len, insert_name, insert_col_version, &flag);
    2025        45084 :     if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to perform merge_did_cid_win", rc);
    2026              :     
    2027              :     // check if the incoming change wins and should be applied
    2028        45084 :     bool does_cid_win = ((needs_resurrect) || (!row_exists_locally) || (flag));
    2029        45084 :     if (!does_cid_win) return DBRES_OK;
    2030              : 
    2031              :     // Block-level LWW: if the incoming col_name is a block entry (contains \x1F),
    2032              :     // bypass the normal base-table write and instead store the value in the blocks table.
    2033              :     // The base table column will be materialized from all alive blocks.
    2034        21771 :     if (block_is_block_colname(insert_name) && table->has_block_cols) {
    2035              :         // Store or delete block value in blocks table depending on tombstone status
    2036          412 :         if (insert_col_version % 2 == 0) {
    2037              :             // Tombstone: remove from blocks table
    2038           33 :             rc = block_delete_value(data, table, insert_pk, insert_pk_len, insert_name);
    2039           33 :         } else {
    2040          379 :             rc = block_store_value(data, table, insert_pk, insert_pk_len, insert_name, insert_value);
    2041              :         }
    2042          412 :         if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to store/delete block value", rc);
    2043              : 
    2044              :         // Set winner clock in metadata
    2045          824 :         rc = merge_set_winner_clock(data, table, insert_pk, insert_pk_len, insert_name,
    2046          412 :                                     insert_col_version, insert_db_version,
    2047          412 :                                     insert_site_id, insert_site_id_len, insert_seq, rowid);
    2048          412 :         if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to set winner clock for block", rc);
    2049              : 
    2050              :         // Materialize the full column from blocks into the base table
    2051          412 :         char *base_col = block_extract_base_colname(insert_name);
    2052          412 :         if (base_col) {
    2053          412 :             rc = block_materialize_column(data, table, insert_pk, insert_pk_len, base_col);
    2054          412 :             cloudsync_memory_free(base_col);
    2055          412 :             if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to materialize block column", rc);
    2056          412 :         }
    2057              : 
    2058          412 :         return DBRES_OK;
    2059              :     }
    2060              : 
    2061              :     // perform the final column insert or update if the incoming change wins
    2062        21359 :     if (data->pending_batch) {
    2063              :         // Propagate row_exists_locally to the batch on the first winning column.
    2064              :         // This lets merge_flush_pending choose UPDATE vs INSERT ON CONFLICT,
    2065              :         // which matters when RLS policies reference columns not in the payload.
    2066        21342 :         if (data->pending_batch->table == NULL) {
    2067         7702 :             data->pending_batch->row_exists = row_exists_locally;
    2068         7702 :         }
    2069        21342 :         rc = merge_pending_add(data, table, insert_pk, insert_pk_len, insert_name, insert_value, insert_col_version, insert_db_version, insert_site_id, insert_site_id_len, insert_seq);
    2070        21342 :         if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_pending_add", rc);
    2071        21342 :     } else {
    2072           17 :         rc = merge_insert_col(data, table, insert_pk, insert_pk_len, insert_name, insert_value, insert_col_version, insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
    2073           17 :         if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_insert_col", rc);
    2074              :     }
    2075              : 
    2076        21359 :     return rc;
    2077        46090 : }
    2078              : 
    2079              : // MARK: - Block column setup -
    2080              : 
    2081              : // Migrate existing tracked rows to block format when block-level LWW is first enabled on a column.
    2082              : // Scans the metadata table for alive rows with the plain col_name entry (not yet block entries),
    2083              : // reads each row's current value from the base table, splits it into blocks, and inserts
    2084              : // the block entries into both the blocks table and the metadata table.
    2085              : // Uses INSERT OR IGNORE semantics so the operation is safe to call multiple times.
    2086           73 : static int block_migrate_existing_rows (cloudsync_context *data, cloudsync_table_context *table, int col_idx) {
    2087           73 :     const char *col_name = table->col_name[col_idx];
    2088           73 :     if (!col_name || !table->meta_ref || !table->blocks_ref) return DBRES_OK;
    2089              : 
    2090           73 :     const char *delim = table->col_delimiter[col_idx] ? table->col_delimiter[col_idx] : BLOCK_DEFAULT_DELIMITER;
    2091           73 :     int64_t db_version = cloudsync_dbversion_next(data, CLOUDSYNC_VALUE_NOTSET);
    2092              : 
    2093              :     // Phase 1: collect all existing PKs that have an alive regular col_name entry
    2094              :     // AND do not yet have any entries in the blocks table for this column.
    2095              :     // The NOT IN filter makes this idempotent: rows that were already migrated
    2096              :     // (or had their blocks created via INSERT) are skipped on subsequent calls.
    2097              :     // We collect PKs before writing so that writes to the metadata table (Phase 2)
    2098              :     // do not perturb the read cursor on the same table.
    2099           73 :     char *like_pattern = block_build_colname(col_name, "%");
    2100           73 :     if (!like_pattern) return DBRES_NOMEM;
    2101              : 
    2102           73 :     char *scan_sql = cloudsync_memory_mprintf(SQL_META_SCAN_COL_FOR_MIGRATION, table->meta_ref, table->blocks_ref);
    2103           73 :     if (!scan_sql) { cloudsync_memory_free(like_pattern); return DBRES_NOMEM; }
    2104           73 :     dbvm_t *scan_vm = NULL;
    2105           73 :     int rc = databasevm_prepare(data, scan_sql, &scan_vm, 0);
    2106           73 :     cloudsync_memory_free(scan_sql);
    2107           73 :     if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); return rc; }
    2108              : 
    2109           73 :     rc = databasevm_bind_text(scan_vm, 1, col_name, -1);
    2110           73 :     if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_finalize(scan_vm); return rc; }
    2111              :     // Bind like_pattern as ?2 and keep it alive until after all scan steps complete,
    2112              :     // because databasevm_bind_text uses SQLITE_STATIC (no copy).
    2113           73 :     rc = databasevm_bind_text(scan_vm, 2, like_pattern, -1);
    2114           73 :     if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_finalize(scan_vm); return rc; }
    2115              : 
    2116              :     // Collect pk blobs into a dynamically-grown array of owned copies
    2117           73 :     void  **pks     = NULL;
    2118           73 :     size_t *pklens  = NULL;
    2119           73 :     int     pk_count = 0;
    2120           73 :     int     pk_cap   = 0;
    2121              : 
    2122           75 :     while ((rc = databasevm_step(scan_vm)) == DBRES_ROW) {
    2123            2 :         size_t pklen = 0;
    2124            2 :         const void *pk = database_column_blob(scan_vm, 0, &pklen);
    2125            2 :         if (!pk || pklen == 0) continue;
    2126              : 
    2127            2 :         if (pk_count >= pk_cap) {
    2128            1 :             int new_cap = pk_cap ? pk_cap * 2 : 8;
    2129            1 :             void  **new_pks    = (void  **)cloudsync_memory_realloc(pks,    (uint64_t)(new_cap * sizeof(void *)));
    2130            1 :             size_t *new_pklens = (size_t *)cloudsync_memory_realloc(pklens, (uint64_t)(new_cap * sizeof(size_t)));
    2131            1 :             if (!new_pks || !new_pklens) {
    2132            0 :                 cloudsync_memory_free(new_pks ? new_pks : pks);
    2133            0 :                 cloudsync_memory_free(new_pklens ? new_pklens : pklens);
    2134            0 :                 databasevm_finalize(scan_vm);
    2135            0 :                 return DBRES_NOMEM;
    2136              :             }
    2137            1 :             pks    = new_pks;
    2138            1 :             pklens = new_pklens;
    2139            1 :             pk_cap = new_cap;
    2140            1 :         }
    2141              : 
    2142            2 :         pks[pk_count] = cloudsync_memory_alloc((uint64_t)pklen);
    2143            2 :         if (!pks[pk_count]) { rc = DBRES_NOMEM; break; }
    2144            2 :         memcpy(pks[pk_count], pk, pklen);
    2145            2 :         pklens[pk_count] = pklen;
    2146            2 :         pk_count++;
    2147              :     }
    2148              : 
    2149           73 :     databasevm_finalize(scan_vm);
    2150           73 :     cloudsync_memory_free(like_pattern); // safe to free after scan_vm is finalized
    2151           73 :     if (rc != DBRES_DONE && rc != DBRES_OK) {
    2152            0 :         for (int i = 0; i < pk_count; i++) cloudsync_memory_free(pks[i]);
    2153            0 :         cloudsync_memory_free(pks);
    2154            0 :         cloudsync_memory_free(pklens);
    2155            0 :         return rc;
    2156              :     }
    2157              : 
    2158           73 :     if (pk_count == 0) {
    2159           72 :         cloudsync_memory_free(pks);
    2160           72 :         cloudsync_memory_free(pklens);
    2161           72 :         return DBRES_OK;
    2162              :     }
    2163              : 
    2164              :     // Phase 2: for each collected PK, read the column value, split into blocks,
    2165              :     // and insert into the blocks table + metadata using INSERT OR IGNORE.
    2166              : 
    2167            1 :     char *meta_sql = cloudsync_memory_mprintf(SQL_META_INSERT_BLOCK_IGNORE, table->meta_ref);
    2168            1 :     if (!meta_sql) { rc = DBRES_NOMEM; goto cleanup_pks; }
    2169            1 :     dbvm_t *meta_vm = NULL;
    2170            1 :     rc = databasevm_prepare(data, meta_sql, &meta_vm, 0);
    2171            1 :     cloudsync_memory_free(meta_sql);
    2172            1 :     if (rc != DBRES_OK) goto cleanup_pks;
    2173              : 
    2174            1 :     char *blocks_sql = cloudsync_memory_mprintf(SQL_BLOCKS_INSERT_IGNORE, table->blocks_ref);
    2175            1 :     if (!blocks_sql) { databasevm_finalize(meta_vm); rc = DBRES_NOMEM; goto cleanup_pks; }
    2176            1 :     dbvm_t *blocks_vm = NULL;
    2177            1 :     rc = databasevm_prepare(data, blocks_sql, &blocks_vm, 0);
    2178            1 :     cloudsync_memory_free(blocks_sql);
    2179            1 :     if (rc != DBRES_OK) { databasevm_finalize(meta_vm); goto cleanup_pks; }
    2180              : 
    2181            1 :     dbvm_t *val_vm = (dbvm_t *)table_column_lookup(table, col_name, false, NULL);
    2182              : 
    2183            3 :     for (int p = 0; p < pk_count; p++) {
    2184            2 :         const void *pk = pks[p];
    2185            2 :         size_t pklen   = pklens[p];
    2186              : 
    2187            2 :         if (!val_vm) continue;
    2188              : 
    2189              :         // Read current column value from the base table
    2190            2 :         int bind_rc = pk_decode_prikey((char *)pk, pklen, pk_decode_bind_callback, (void *)val_vm);
    2191            2 :         if (bind_rc < 0) { databasevm_reset(val_vm); continue; }
    2192              : 
    2193            2 :         int step_rc = databasevm_step(val_vm);
    2194            2 :         const char *text = (step_rc == DBRES_ROW) ? database_column_text(val_vm, 0) : NULL;
    2195              :         // Make a copy of text before resetting val_vm, as the pointer is only valid until reset
    2196            2 :         char *text_copy = text ? cloudsync_string_dup(text) : NULL;
    2197            2 :         databasevm_reset(val_vm);
    2198              : 
    2199            2 :         if (!text_copy) continue; // NULL column value: nothing to migrate
    2200              : 
    2201              :         // Split text into blocks and store each one
    2202            2 :         block_list_t *blocks = block_split(text_copy, delim);
    2203            2 :         cloudsync_memory_free(text_copy);
    2204            2 :         if (!blocks) continue;
    2205              : 
    2206            2 :         char **positions = block_initial_positions(blocks->count);
    2207            2 :         if (positions) {
    2208            7 :             for (int b = 0; b < blocks->count; b++) {
    2209            5 :                 char *block_cn = block_build_colname(col_name, positions[b]);
    2210            5 :                 if (block_cn) {
    2211              :                     // Metadata entry (skip if this block position already exists)
    2212            5 :                     databasevm_bind_blob(meta_vm, 1, pk, (int)pklen);
    2213            5 :                     databasevm_bind_text(meta_vm, 2, block_cn, -1);
    2214            5 :                     databasevm_bind_int(meta_vm, 3, 1);            // col_version = 1 (alive)
    2215            5 :                     databasevm_bind_int(meta_vm, 4, db_version);
    2216            5 :                     databasevm_bind_int(meta_vm, 5, cloudsync_bumpseq(data));
    2217            5 :                     databasevm_step(meta_vm);
    2218            5 :                     databasevm_reset(meta_vm);
    2219              : 
    2220              :                     // Block value (skip if this block position already exists)
    2221            5 :                     databasevm_bind_blob(blocks_vm, 1, pk, (int)pklen);
    2222            5 :                     databasevm_bind_text(blocks_vm, 2, block_cn, -1);
    2223            5 :                     databasevm_bind_text(blocks_vm, 3, blocks->entries[b].content, -1);
    2224            5 :                     databasevm_step(blocks_vm);
    2225            5 :                     databasevm_reset(blocks_vm);
    2226              : 
    2227            5 :                     cloudsync_memory_free(block_cn);
    2228            5 :                 }
    2229            5 :                 cloudsync_memory_free(positions[b]);
    2230            5 :             }
    2231            2 :             cloudsync_memory_free(positions);
    2232            2 :         }
    2233            2 :         block_list_free(blocks);
    2234            2 :     }
    2235              : 
    2236            1 :     databasevm_finalize(meta_vm);
    2237            1 :     databasevm_finalize(blocks_vm);
    2238            1 :     rc = DBRES_OK;
    2239              : 
    2240              : cleanup_pks:
    2241            3 :     for (int i = 0; i < pk_count; i++) cloudsync_memory_free(pks[i]);
    2242            1 :     cloudsync_memory_free(pks);
    2243            1 :     cloudsync_memory_free(pklens);
    2244            1 :     return rc;
    2245           73 : }
    2246              : 
    2247           74 : int cloudsync_setup_block_column (cloudsync_context *data, const char *table_name, const char *col_name, const char *delimiter, bool persist) {
    2248           74 :     cloudsync_table_context *table = table_lookup(data, table_name);
    2249           74 :     if (!table) return cloudsync_set_error(data, "cloudsync_setup_block_column: table not found", DBRES_ERROR);
    2250              : 
    2251              :     // Find column index
    2252           74 :     int col_idx = table_col_index(table, col_name);
    2253           74 :     if (col_idx < 0) {
    2254              :         char buf[1024];
    2255            0 :         snprintf(buf, sizeof(buf), "cloudsync_setup_block_column: column '%s' not found in table '%s'", col_name, table_name);
    2256            0 :         return cloudsync_set_error(data, buf, DBRES_ERROR);
    2257              :     }
    2258              : 
    2259              :     // Set column algo
    2260           74 :     table->col_algo[col_idx] = col_algo_block;
    2261           74 :     table->has_block_cols = true;
    2262              : 
    2263              :     // Set delimiter (can be NULL for default)
    2264           74 :     if (table->col_delimiter[col_idx]) {
    2265            0 :         cloudsync_memory_free(table->col_delimiter[col_idx]);
    2266            0 :         table->col_delimiter[col_idx] = NULL;
    2267            0 :     }
    2268           74 :     if (delimiter) {
    2269            1 :         table->col_delimiter[col_idx] = cloudsync_string_dup(delimiter);
    2270            1 :     }
    2271              : 
    2272              :     // Create blocks table if not already done
    2273           74 :     if (!table->blocks_ref) {
    2274           71 :         table->blocks_ref = database_build_blocks_ref(table->schema, table->name);
    2275           71 :         if (!table->blocks_ref) return DBRES_NOMEM;
    2276              : 
    2277              :         // CREATE TABLE IF NOT EXISTS
    2278           71 :         char *sql = cloudsync_memory_mprintf(SQL_BLOCKS_CREATE_TABLE, table->blocks_ref);
    2279           71 :         if (!sql) return DBRES_NOMEM;
    2280              : 
    2281           71 :         int rc = database_exec(data, sql);
    2282           71 :         cloudsync_memory_free(sql);
    2283           71 :         if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to create blocks table", rc);
    2284              : 
    2285              :         // Prepare block statements
    2286              :         // Write: upsert into blocks (pk, col_name, col_value)
    2287           71 :         sql = cloudsync_memory_mprintf(SQL_BLOCKS_UPSERT, table->blocks_ref);
    2288           71 :         if (!sql) return DBRES_NOMEM;
    2289           71 :         rc = databasevm_prepare(data, sql, (void **)&table->block_value_write_stmt, DBFLAG_PERSISTENT);
    2290           71 :         cloudsync_memory_free(sql);
    2291           71 :         if (rc != DBRES_OK) return rc;
    2292              : 
    2293              :         // Read: SELECT col_value FROM blocks WHERE pk = ? AND col_name = ?
    2294           71 :         sql = cloudsync_memory_mprintf(SQL_BLOCKS_SELECT, table->blocks_ref);
    2295           71 :         if (!sql) return DBRES_NOMEM;
    2296           71 :         rc = databasevm_prepare(data, sql, (void **)&table->block_value_read_stmt, DBFLAG_PERSISTENT);
    2297           71 :         cloudsync_memory_free(sql);
    2298           71 :         if (rc != DBRES_OK) return rc;
    2299              : 
    2300              :         // Delete: DELETE FROM blocks WHERE pk = ? AND col_name = ?
    2301           71 :         sql = cloudsync_memory_mprintf(SQL_BLOCKS_DELETE, table->blocks_ref);
    2302           71 :         if (!sql) return DBRES_NOMEM;
    2303           71 :         rc = databasevm_prepare(data, sql, (void **)&table->block_value_delete_stmt, DBFLAG_PERSISTENT);
    2304           71 :         cloudsync_memory_free(sql);
    2305           71 :         if (rc != DBRES_OK) return rc;
    2306              : 
    2307              :         // List alive blocks for materialization
    2308           71 :         sql = cloudsync_memory_mprintf(SQL_BLOCKS_LIST_ALIVE, table->blocks_ref, table->meta_ref);
    2309           71 :         if (!sql) return DBRES_NOMEM;
    2310           71 :         rc = databasevm_prepare(data, sql, (void **)&table->block_list_stmt, DBFLAG_PERSISTENT);
    2311           71 :         cloudsync_memory_free(sql);
    2312           71 :         if (rc != DBRES_OK) return rc;
    2313           71 :     }
    2314              : 
    2315              :     // Persist settings (skipped when called from the settings loader, since
    2316              :     // writing to cloudsync_table_settings while sqlite3_exec is iterating it
    2317              :     // re-feeds the rewritten row to the cursor and causes an infinite loop).
    2318           74 :     if (persist) {
    2319           73 :         int rc = dbutils_table_settings_set_key_value(data, table_name, col_name, "algo", "block");
    2320           73 :         if (rc != DBRES_OK) return rc;
    2321              : 
    2322           73 :         if (delimiter) {
    2323            0 :             rc = dbutils_table_settings_set_key_value(data, table_name, col_name, "delimiter", delimiter);
    2324            0 :             if (rc != DBRES_OK) return rc;
    2325            0 :         }
    2326              : 
    2327              :         // Migrate any existing tracked rows: populate the blocks table and metadata with
    2328              :         // block entries derived from the current column value, so that subsequent UPDATE
    2329              :         // operations can diff against the real existing state instead of treating everything
    2330              :         // as new, and so this node participates correctly in LWW conflict resolution.
    2331           73 :         rc = block_migrate_existing_rows(data, table, col_idx);
    2332           73 :         if (rc != DBRES_OK) return rc;
    2333           73 :     }
    2334              : 
    2335           74 :     return DBRES_OK;
    2336           74 : }
    2337              : 
    2338              : // MARK: - Private -
    2339              : 
    2340          234 : bool cloudsync_config_exists (cloudsync_context *data) {
    2341          234 :     return database_internal_table_exists(data, CLOUDSYNC_SITEID_NAME) == true;
    2342              : }
    2343              : 
    2344          713 : bool cloudsync_context_is_initialized (cloudsync_context *data) {
    2345              :     // A fully initialized context has its persistent "is the DB stale" probe
    2346              :     // prepared. cloudsync_context_init prepares data_version_stmt (via
    2347              :     // cloudsync_add_dbvms) only after the cloudsync_site_id table exists, so
    2348              :     // a non-NULL pointer means cloudsync_init has been called at least once
    2349              :     // on this connection. Used to produce actionable error messages when
    2350              :     // callers hit a function before calling cloudsync_init.
    2351          713 :     return data != NULL && data->data_version_stmt != NULL;
    2352              : }
    2353              : 
    2354          249 : cloudsync_context *cloudsync_context_create (void *db) {
    2355          249 :     cloudsync_context *data = (cloudsync_context *)cloudsync_memory_zeroalloc((uint64_t)(sizeof(cloudsync_context)));
    2356          249 :     if (!data) return NULL;
    2357              :     DEBUG_SETTINGS("cloudsync_context_create %p", data);
    2358              :     
    2359          249 :     data->libversion = CLOUDSYNC_VERSION;
    2360          249 :     data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
    2361              :     #if CLOUDSYNC_DEBUG
    2362              :     data->debug = 1;
    2363              :     #endif
    2364              :     
    2365              :     // allocate space for 64 tables (it can grow if needed)
    2366          249 :     uint64_t mem_needed = (uint64_t)(CLOUDSYNC_INIT_NTABLES * sizeof(cloudsync_table_context *));
    2367          249 :     data->tables = (cloudsync_table_context **)cloudsync_memory_zeroalloc(mem_needed);
    2368          249 :     if (!data->tables) {cloudsync_memory_free(data); return NULL;}
    2369              :     
    2370          249 :     data->tables_cap = CLOUDSYNC_INIT_NTABLES;
    2371          249 :     data->tables_count = 0;
    2372          249 :     data->db = db;
    2373              :     
    2374              :     // SQLite exposes col_value as ANY, but other databases require a concrete type.
    2375              :     // In PostgreSQL we expose col_value as bytea, which holds the pk-encoded value bytes (type + data).
    2376              :     // Because col_value is already encoded, we skip decoding this field and pass it through as bytea.
    2377              :     // It is decoded to the target column type just before applying changes to the base table.
    2378          249 :     data->skip_decode_idx = (db == NULL) ? CLOUDSYNC_PK_INDEX_COLVALUE : -1;
    2379              : 
    2380          249 :     return data;
    2381          249 : }
    2382              : 
    2383          249 : void cloudsync_context_free (void *ctx) {
    2384          249 :     cloudsync_context *data = (cloudsync_context *)ctx;
    2385              :     DEBUG_SETTINGS("cloudsync_context_free %p", data);
    2386          249 :     if (!data) return;
    2387              : 
    2388              :     // free all table contexts and prepared statements
    2389          249 :     cloudsync_terminate(data);
    2390              : 
    2391          249 :     cloudsync_memory_free(data->tables);
    2392          249 :     cloudsync_memory_free(data);
    2393          249 : }
    2394              : 
    2395          344 : const char *cloudsync_context_init (cloudsync_context *data) {
    2396          344 :     if (!data) return NULL;
    2397              :     
    2398              :     // perform init just the first time, if the site_id field is not set.
    2399              :     // The data->site_id value could exists while settings tables don't exists if the
    2400              :     // cloudsync_context_init was previously called in init transaction that was rolled back
    2401              :     // because of an error during the init process.
    2402          344 :     if (data->site_id[0] == 0 || !database_internal_table_exists(data, CLOUDSYNC_SITEID_NAME)) {
    2403          229 :         if (dbutils_settings_init(data) != DBRES_OK) return NULL;
    2404          229 :         if (cloudsync_add_dbvms(data) != DBRES_OK) return NULL;
    2405          229 :         if (cloudsync_load_siteid(data) != DBRES_OK) return NULL;
    2406          229 :         data->schema_hash = database_schema_hash(data);
    2407          229 :     }
    2408              :     
    2409          344 :     return (const char *)data->site_id;
    2410          344 : }
    2411              : 
    2412         1370 : void cloudsync_sync_key (cloudsync_context *data, const char *key, const char *value) {
    2413              :     DEBUG_SETTINGS("cloudsync_sync_key key: %s value: %s", key, value);
    2414              :     
    2415              :     // sync data
    2416         1370 :     if (strcmp(key, CLOUDSYNC_KEY_SCHEMAVERSION) == 0) {
    2417          229 :         data->schema_version = (int)strtol(value, NULL, 0);
    2418          229 :         return;
    2419              :     }
    2420              :     
    2421         1141 :     if (strcmp(key, CLOUDSYNC_KEY_DEBUG) == 0) {
    2422            0 :         data->debug = 0;
    2423            0 :         if (value && (value[0] != 0) && (value[0] != '0')) data->debug = 1;
    2424            0 :         return;
    2425              :     }
    2426              : 
    2427         1141 :     if (strcmp(key, CLOUDSYNC_KEY_SCHEMA) == 0) {
    2428            0 :         cloudsync_set_schema(data, value);
    2429            0 :         return;
    2430              :     }
    2431         1370 : }
    2432              : 
    2433              : #if 0
    2434              : void cloudsync_sync_table_key(cloudsync_context *data, const char *table, const char *column, const char *key, const char *value) {
    2435              :     DEBUG_SETTINGS("cloudsync_sync_table_key table: %s column: %s key: %s value: %s", table, column, key, value);
    2436              :     // Unused in this version
    2437              :     return;
    2438              : }
    2439              : #endif
    2440              : 
    2441         8316 : int cloudsync_commit_hook (void *ctx) {
    2442         8316 :     cloudsync_context *data = (cloudsync_context *)ctx;
    2443              :     
    2444         8316 :     data->db_version = data->pending_db_version;
    2445         8316 :     data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
    2446         8316 :     data->seq = 0;
    2447              :     
    2448         8316 :     return DBRES_OK;
    2449              : }
    2450              : 
    2451            3 : void cloudsync_rollback_hook (void *ctx) {
    2452            3 :     cloudsync_context *data = (cloudsync_context *)ctx;
    2453              :     
    2454            3 :     data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
    2455            3 :     data->seq = 0;
    2456            3 : }
    2457              : 
    2458           24 : int cloudsync_begin_alter (cloudsync_context *data, const char *table_name) {
    2459              :     // init cloudsync_settings
    2460           24 :     if (cloudsync_context_init(data) == NULL) {
    2461            0 :         return DBRES_MISUSE;
    2462              :     }
    2463              : 
    2464              :     // lookup table
    2465           24 :     cloudsync_table_context *table = table_lookup(data, table_name);
    2466           24 :     if (!table) {
    2467              :         char buffer[1024];
    2468            1 :         snprintf(buffer, sizeof(buffer), "Unable to find table %s", table_name);
    2469            1 :         return cloudsync_set_error(data, buffer, DBRES_MISUSE);
    2470              :     }
    2471              : 
    2472              :     // idempotent: if already altering, return OK
    2473           23 :     if (table->is_altering) return DBRES_OK;
    2474              : 
    2475              :     // retrieve primary key(s)
    2476           23 :     char **names = NULL;
    2477           23 :     int nrows = 0;
    2478           23 :     int rc = database_pk_names(data, table_name, &names, &nrows);
    2479           23 :     if (rc != DBRES_OK) {
    2480              :         char buffer[1024];
    2481            0 :         snprintf(buffer, sizeof(buffer), "Unable to get primary keys for table %s", table_name);
    2482            0 :         cloudsync_set_error(data, buffer, DBRES_MISUSE);
    2483            0 :         goto rollback_begin_alter;
    2484              :     }
    2485              : 
    2486              :     // sanity check the number of primary keys
    2487           23 :     if (nrows != table_count_pks(table)) {
    2488              :         char buffer[1024];
    2489            0 :         snprintf(buffer, sizeof(buffer), "Number of primary keys for table %s changed before ALTER", table_name);
    2490            0 :         cloudsync_set_error(data, buffer, DBRES_MISUSE);
    2491            0 :         goto rollback_begin_alter;
    2492              :     }
    2493              : 
    2494              :     // drop original triggers
    2495           23 :     rc = database_delete_triggers(data, table_name);
    2496           23 :     if (rc != DBRES_OK) {
    2497              :         char buffer[1024];
    2498            0 :         snprintf(buffer, sizeof(buffer), "Unable to delete triggers for table %s in cloudsync_begin_alter.", table_name);
    2499            0 :         cloudsync_set_error(data, buffer, DBRES_ERROR);
    2500            0 :         goto rollback_begin_alter;
    2501              :     }
    2502              : 
    2503           23 :     table_set_pknames(table, names);
    2504           23 :     table->is_altering = true;
    2505           23 :     return DBRES_OK;
    2506              : 
    2507              : rollback_begin_alter:
    2508            0 :     if (names) table_pknames_free(names, nrows);
    2509            0 :     return rc;
    2510           24 : }
    2511              : 
    2512           23 : int cloudsync_finalize_alter (cloudsync_context *data, cloudsync_table_context *table) {
    2513              :     // check if dbversion needed to be updated
    2514           23 :     cloudsync_dbversion_check_uptodate(data);
    2515              : 
    2516              :     // if primary-key columns change, all row identities change.
    2517              :     // In that case, the clock table must be dropped, recreated,
    2518              :     // and backfilled. We detect this by comparing the unique index
    2519              :     // in the lookaside table with the source table's PKs.
    2520              :     
    2521              :     // retrieve primary keys (to check is they changed)
    2522           23 :     char **result = NULL;
    2523           23 :     int nrows = 0;
    2524           23 :     int rc = database_pk_names (data, table->name, &result, &nrows);
    2525           23 :     if (rc != DBRES_OK || nrows == 0) {
    2526            0 :         if (nrows == 0) rc = DBRES_MISUSE;
    2527            0 :         goto finalize;
    2528              :     }
    2529              :     
    2530              :     // check if there are differences
    2531           23 :     bool pk_diff = (nrows != table->npks);
    2532           23 :     if (!pk_diff) {
    2533           45 :         for (int i = 0; i < nrows; ++i) {
    2534           34 :             if (strcmp(table->pk_name[i], result[i]) != 0) {
    2535            6 :                 pk_diff = true;
    2536            6 :                 break;
    2537              :             }
    2538           28 :         }
    2539           17 :     }
    2540              :     
    2541           23 :     if (pk_diff) {
    2542              :         // drop meta-table, it will be recreated
    2543           12 :         char *sql = cloudsync_memory_mprintf(SQL_DROP_CLOUDSYNC_TABLE, table->meta_ref);
    2544           12 :         rc = database_exec(data, sql);
    2545           12 :         cloudsync_memory_free(sql);
    2546           12 :         if (rc != DBRES_OK) {
    2547            0 :             DEBUG_DBERROR(rc, "cloudsync_finalize_alter", data);
    2548            0 :             goto finalize;
    2549              :         }
    2550           12 :     } else {
    2551              :         // compact meta-table
    2552              :         // delete entries for removed columns
    2553           11 :         const char *schema = table->schema ? table->schema : "";
    2554           11 :         char *sql = sql_build_delete_cols_not_in_schema_query(schema, table->name, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
    2555           11 :         rc = database_exec(data, sql);
    2556           11 :         cloudsync_memory_free(sql);
    2557           11 :         if (rc != DBRES_OK) {
    2558            0 :             DEBUG_DBERROR(rc, "cloudsync_finalize_alter", data);
    2559            0 :             goto finalize;
    2560              :         }
    2561              :         
    2562           11 :         sql = sql_build_pk_qualified_collist_query(schema, table->name);
    2563           11 :         if (!sql) {rc = DBRES_NOMEM; goto finalize;}
    2564              :         
    2565           11 :         char *pkclause = NULL;
    2566           11 :         rc = database_select_text(data, sql, &pkclause);
    2567           11 :         cloudsync_memory_free(sql);
    2568           11 :         if (rc != DBRES_OK) goto finalize;
    2569           11 :         char *pkvalues = (pkclause) ? pkclause : "rowid";
    2570              :         
    2571              :         // delete entries related to rows that no longer exist in the original table, but preserve tombstone
    2572           11 :         sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_GC_DELETE_ORPHANED_PK, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE, CLOUDSYNC_TOMBSTONE_VALUE, table->base_ref, table->meta_ref, pkvalues);
    2573           11 :         rc = database_exec(data, sql);
    2574           11 :         if (pkclause) cloudsync_memory_free(pkclause);
    2575           11 :         cloudsync_memory_free(sql);
    2576           11 :         if (rc != DBRES_OK) {
    2577            0 :             DEBUG_DBERROR(rc, "cloudsync_finalize_alter", data);
    2578            0 :             goto finalize;
    2579              :         }
    2580              : 
    2581              :     }
    2582              :     
    2583              :     // update key to be later used in cloudsync_dbversion_rebuild
    2584              :     char buf[256];
    2585           23 :     snprintf(buf, sizeof(buf), "%" PRId64, data->db_version);
    2586           23 :     dbutils_settings_set_key_value(data, "pre_alter_dbversion", buf);
    2587              :     
    2588              : finalize:
    2589           23 :     table_pknames_free(result, nrows);
    2590           23 :     return rc;
    2591              : }
    2592              : 
    2593           24 : int cloudsync_commit_alter (cloudsync_context *data, const char *table_name) {
    2594           24 :     int rc = DBRES_MISUSE;
    2595           24 :     cloudsync_table_context *table = NULL;
    2596              : 
    2597              :     // init cloudsync_settings
    2598           24 :     if (cloudsync_context_init(data) == NULL) {
    2599            0 :         cloudsync_set_error(data, "Unable to initialize cloudsync context", DBRES_MISUSE);
    2600            0 :         goto rollback_finalize_alter;
    2601              :     }
    2602              : 
    2603              :     // lookup table
    2604           24 :     table = table_lookup(data, table_name);
    2605           24 :     if (!table) {
    2606              :         char buffer[1024];
    2607            1 :         snprintf(buffer, sizeof(buffer), "Unable to find table %s", table_name);
    2608            1 :         cloudsync_set_error(data, buffer, DBRES_MISUSE);
    2609            1 :         goto rollback_finalize_alter;
    2610              :     }
    2611              : 
    2612              :     // idempotent: if not altering, return OK
    2613           23 :     if (!table->is_altering) return DBRES_OK;
    2614              : 
    2615           23 :     rc = cloudsync_finalize_alter(data, table);
    2616           23 :     if (rc != DBRES_OK) goto rollback_finalize_alter;
    2617              : 
    2618              :     // the table is outdated, delete it and it will be reloaded in the cloudsync_init_internal
    2619              :     // is_altering is reset implicitly because table_free + cloudsync_init_table
    2620              :     // will reallocate the table context with zero-initialized memory
    2621           23 :     table_remove(data, table);
    2622           23 :     table_free(table);
    2623           23 :     table = NULL;
    2624              : 
    2625              :     // init again cloudsync for the table
    2626           23 :     table_algo algo_current = dbutils_table_settings_get_algo(data, table_name);
    2627           23 :     if (algo_current == table_algo_none) algo_current = dbutils_table_settings_get_algo(data, "*");
    2628           23 :     rc = cloudsync_init_table(data, table_name, cloudsync_algo_name(algo_current), CLOUDSYNC_INIT_FLAG_SKIP_INT_PK_CHECK);
    2629           23 :     if (rc != DBRES_OK) goto rollback_finalize_alter;
    2630              : 
    2631           23 :     return DBRES_OK;
    2632              : 
    2633              : rollback_finalize_alter:
    2634            1 :     if (table) {
    2635            0 :         table_set_pknames(table, NULL);
    2636            0 :         table->is_altering = false;
    2637            0 :     }
    2638            1 :     return rc;
    2639           24 : }
    2640              : 
    2641              : // MARK: - Filter Rewrite -
    2642              : 
    2643              : // Replace bare column names in a filter expression with prefix-qualified names.
    2644              : // E.g., filter="user_id = 42", prefix="NEW", columns=["user_id","id"] → "NEW.\"user_id\" = 42"
    2645              : // Columns must be sorted by length descending by the caller to avoid partial matches.
    2646              : // Skips content inside single-quoted string literals.
    2647              : // Returns a newly allocated string (caller must free with cloudsync_memory_free), or NULL on error.
    2648              : // Helper: check if an identifier token matches a column name.
    2649          112 : static bool filter_is_column (const char *token, size_t token_len, char **columns, int ncols) {
    2650          450 :     for (int i = 0; i < ncols; ++i) {
    2651          388 :         if (strlen(columns[i]) == token_len && strncmp(token, columns[i], token_len) == 0)
    2652           50 :             return true;
    2653          338 :     }
    2654           62 :     return false;
    2655          112 : }
    2656              : 
    2657              : // Helper: check if character is part of a SQL identifier.
    2658          796 : static bool filter_is_ident_char (char c) {
    2659         1262 :     return (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') ||
    2660          466 :            (c >= '0' && c <= '9') || c == '_';
    2661              : }
    2662              : 
    2663           40 : char *cloudsync_filter_add_row_prefix (const char *filter, const char *prefix, char **columns, int ncols) {
    2664           40 :     if (!filter || !prefix || !columns || ncols <= 0) return NULL;
    2665              : 
    2666           40 :     size_t filter_len = strlen(filter);
    2667           40 :     size_t prefix_len = strlen(prefix);
    2668              : 
    2669              :     // Each identifier match grows by at most (prefix_len + 3) bytes.
    2670              :     // Worst case: the entire filter is one repeated column reference separated by
    2671              :     // single characters, so up to (filter_len / 2) matches.  Use a safe upper bound.
    2672           40 :     size_t max_growth = (filter_len / 2 + 1) * (prefix_len + 3);
    2673           40 :     size_t cap = filter_len + max_growth + 64;
    2674           40 :     char *result = (char *)cloudsync_memory_alloc(cap);
    2675           40 :     if (!result) return NULL;
    2676           40 :     size_t out = 0;
    2677              : 
    2678              :     // Single pass: tokenize into identifiers, quoted strings, and everything else.
    2679           40 :     size_t i = 0;
    2680          336 :     while (i < filter_len) {
    2681              :         // Skip single-quoted string literals verbatim (handle '' escape)
    2682          296 :         if (filter[i] == '\'') {
    2683            6 :             result[out++] = filter[i++];
    2684           38 :             while (i < filter_len) {
    2685           38 :                 if (filter[i] == '\'') {
    2686            6 :                     result[out++] = filter[i++];
    2687              :                     // '' is an escaped quote — keep going
    2688            6 :                     if (i < filter_len && filter[i] == '\'') {
    2689            0 :                         result[out++] = filter[i++];
    2690            0 :                         continue;
    2691              :                     }
    2692            6 :                     break; // single ' ends the literal
    2693              :                 }
    2694           32 :                 result[out++] = filter[i++];
    2695              :             }
    2696            6 :             continue;
    2697              :         }
    2698              : 
    2699              :         // Extract identifier token
    2700          290 :         if (filter_is_ident_char(filter[i])) {
    2701          112 :             size_t start = i;
    2702          540 :             while (i < filter_len && filter_is_ident_char(filter[i])) ++i;
    2703          112 :             size_t token_len = i - start;
    2704              : 
    2705          112 :             if (filter_is_column(&filter[start], token_len, columns, ncols)) {
    2706              :                 // Emit PREFIX."column_name"
    2707           50 :                 memcpy(&result[out], prefix, prefix_len); out += prefix_len;
    2708           50 :                 result[out++] = '.';
    2709           50 :                 result[out++] = '"';
    2710           50 :                 memcpy(&result[out], &filter[start], token_len); out += token_len;
    2711           50 :                 result[out++] = '"';
    2712           50 :             } else {
    2713              :                 // Not a column — copy as-is
    2714           62 :                 memcpy(&result[out], &filter[start], token_len); out += token_len;
    2715              :             }
    2716          112 :             continue;
    2717              :         }
    2718              : 
    2719              :         // Any other character — copy as-is
    2720          178 :         result[out++] = filter[i++];
    2721              :     }
    2722              : 
    2723           40 :     result[out] = '\0';
    2724           40 :     return result;
    2725           40 : }
    2726              : 
    2727           24 : int cloudsync_reset_metatable (cloudsync_context *data, const char *table_name) {
    2728           24 :     cloudsync_table_context *table = table_lookup(data, table_name);
    2729           24 :     if (!table) return DBRES_ERROR;
    2730              : 
    2731           24 :     char *sql = cloudsync_memory_mprintf(SQL_DELETE_ALL_FROM_CLOUDSYNC_TABLE, table->meta_ref);
    2732           24 :     int rc = database_exec(data, sql);
    2733           24 :     cloudsync_memory_free(sql);
    2734           24 :     if (rc != DBRES_OK) return rc;
    2735              : 
    2736           24 :     return cloudsync_refill_metatable(data, table_name);
    2737           24 : }
    2738              : 
    2739          308 : int cloudsync_refill_metatable (cloudsync_context *data, const char *table_name) {
    2740          308 :     cloudsync_table_context *table = table_lookup(data, table_name);
    2741          308 :     if (!table) return DBRES_ERROR;
    2742              : 
    2743          308 :     dbvm_t *vm = NULL;
    2744          308 :     int64_t db_version = cloudsync_dbversion_next(data, CLOUDSYNC_VALUE_NOTSET);
    2745              : 
    2746              :     // Read row-level filter from settings (if any)
    2747              :     char filter_buf[2048];
    2748          308 :     int frc = dbutils_table_settings_get_value(data, table_name, "*", "filter", filter_buf, sizeof(filter_buf));
    2749          308 :     const char *filter = (frc == DBRES_OK && filter_buf[0]) ? filter_buf : NULL;
    2750              : 
    2751          308 :     const char *schema = table->schema ? table->schema : "";
    2752          308 :     char *sql = sql_build_pk_collist_query(schema, table_name);
    2753          308 :     char *pkclause_identifiers = NULL;
    2754          308 :     int rc = database_select_text(data, sql, &pkclause_identifiers);
    2755          308 :     cloudsync_memory_free(sql);
    2756          308 :     if (rc != DBRES_OK) goto finalize;
    2757          308 :     char *pkvalues_identifiers = (pkclause_identifiers) ? pkclause_identifiers : "rowid";
    2758              : 
    2759              :     // Use database-specific query builder to handle type differences in composite PKs
    2760          308 :     sql = sql_build_insert_missing_pks_query(schema, table_name, pkvalues_identifiers, table->base_ref, table->meta_ref, filter);
    2761          308 :     if (!sql) {rc = DBRES_NOMEM; goto finalize;}
    2762          308 :     rc = database_exec(data, sql);
    2763          308 :     cloudsync_memory_free(sql);
    2764          308 :     if (rc != DBRES_OK) goto finalize;
    2765              : 
    2766              :     // fill missing colums
    2767              :     // for each non-pk column:
    2768              :     // The new query does 1 encode per source row and one indexed NOT-EXISTS probe.
    2769              :     // The old plan does many decodes per candidate and can't use an index to rule out matches quickly—so it burns CPU and I/O.
    2770              : 
    2771          308 :     if (filter) {
    2772           20 :         sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_PKS_NOT_IN_SYNC_FOR_COL_FILTERED, pkvalues_identifiers, table->base_ref, filter, table->meta_ref);
    2773           20 :     } else {
    2774          288 :         sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_PKS_NOT_IN_SYNC_FOR_COL, pkvalues_identifiers, table->base_ref, table->meta_ref);
    2775              :     }
    2776          308 :     rc = databasevm_prepare(data, sql, (void **)&vm, DBFLAG_PERSISTENT);
    2777          308 :     cloudsync_memory_free(sql);
    2778          308 :     if (rc != DBRES_OK) goto finalize;
    2779              :      
    2780         1457 :     for (int i=0; i<table->ncols; ++i) {
    2781         1149 :         char *col_name = table->col_name[i];
    2782              : 
    2783         1149 :         rc = databasevm_bind_text(vm, 1, col_name, -1);
    2784         1149 :         if (rc != DBRES_OK) goto finalize;
    2785              : 
    2786         1187 :         while (1) {
    2787         1187 :             rc = databasevm_step(vm);
    2788         1187 :             if (rc == DBRES_ROW) {
    2789           38 :                 size_t pklen = 0;
    2790           38 :                 const void *pk = (const char *)database_column_blob(vm, 0, &pklen);
    2791           38 :                 if (!pk) { rc = DBRES_ERROR; break; }
    2792           38 :                 rc = local_mark_insert_or_update_meta(table, pk, pklen, col_name, db_version, cloudsync_bumpseq(data));
    2793         1187 :             } else if (rc == DBRES_DONE) {
    2794         1149 :                 rc = DBRES_OK;
    2795         1149 :                 break;
    2796              :             } else {
    2797            0 :                 break;
    2798              :             }
    2799              :         }
    2800         1149 :         if (rc != DBRES_OK) goto finalize;
    2801              : 
    2802         1149 :         databasevm_reset(vm);
    2803         1457 :     }
    2804              :     
    2805              : finalize:
    2806          308 :     if (rc != DBRES_OK) {DEBUG_ALWAYS("cloudsync_refill_metatable error: %s", database_errmsg(data));}
    2807          308 :     if (pkclause_identifiers) cloudsync_memory_free(pkclause_identifiers);
    2808          308 :     if (vm) databasevm_finalize(vm);
    2809          308 :     return rc;
    2810          308 : }
    2811              : 
    2812              : // MARK: - Local -
    2813              : 
    2814            4 : int local_update_sentinel (cloudsync_table_context *table, const void *pk, size_t pklen, int64_t db_version, int seq) {
    2815            4 :     dbvm_t *vm = table->meta_sentinel_update_stmt;
    2816            4 :     if (!vm) return -1;
    2817              :     
    2818            4 :     int rc = databasevm_bind_int(vm, 1, db_version);
    2819            4 :     if (rc != DBRES_OK) goto cleanup;
    2820              :     
    2821            4 :     rc = databasevm_bind_int(vm, 2, seq);
    2822            4 :     if (rc != DBRES_OK) goto cleanup;
    2823              :     
    2824            4 :     rc = databasevm_bind_blob(vm, 3, pk, (int)pklen);
    2825            4 :     if (rc != DBRES_OK) goto cleanup;
    2826              :     
    2827            4 :     rc = databasevm_step(vm);
    2828            4 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    2829              :     
    2830              : cleanup:
    2831            4 :     DEBUG_DBERROR(rc, "local_update_sentinel", table->context);
    2832            4 :     databasevm_reset(vm);
    2833            4 :     return rc;
    2834            4 : }
    2835              : 
    2836          125 : int local_mark_insert_sentinel_meta (cloudsync_table_context *table, const void *pk, size_t pklen, int64_t db_version, int seq) {
    2837          125 :     dbvm_t *vm = table->meta_sentinel_insert_stmt;
    2838          125 :     if (!vm) return -1;
    2839              :     
    2840          125 :     int rc = databasevm_bind_blob(vm, 1, pk, (int)pklen);
    2841          125 :     if (rc != DBRES_OK) goto cleanup;
    2842              :     
    2843          125 :     rc = databasevm_bind_int(vm, 2, db_version);
    2844          125 :     if (rc != DBRES_OK) goto cleanup;
    2845              :     
    2846          125 :     rc = databasevm_bind_int(vm, 3, seq);
    2847          125 :     if (rc != DBRES_OK) goto cleanup;
    2848              :     
    2849          125 :     rc = databasevm_bind_int(vm, 4, db_version);
    2850          125 :     if (rc != DBRES_OK) goto cleanup;
    2851              :     
    2852          125 :     rc = databasevm_bind_int(vm, 5, seq);
    2853          125 :     if (rc != DBRES_OK) goto cleanup;
    2854              :     
    2855          125 :     rc = databasevm_step(vm);
    2856          125 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    2857              :     
    2858              : cleanup:
    2859          125 :     DEBUG_DBERROR(rc, "local_insert_sentinel", table->context);
    2860          125 :     databasevm_reset(vm);
    2861          125 :     return rc;
    2862          125 : }
    2863              : 
    2864        11951 : int local_mark_insert_or_update_meta_impl (cloudsync_table_context *table, const void *pk, size_t pklen, const char *col_name, int col_version, int64_t db_version, int seq) {
    2865              :     
    2866        11951 :     dbvm_t *vm = table->meta_row_insert_update_stmt;
    2867        11951 :     if (!vm) return -1;
    2868              :     
    2869        11951 :     int rc = databasevm_bind_blob(vm, 1, pk, pklen);
    2870        11951 :     if (rc != DBRES_OK) goto cleanup;
    2871              :     
    2872        11951 :     rc = databasevm_bind_text(vm, 2, (col_name) ? col_name : CLOUDSYNC_TOMBSTONE_VALUE, -1);
    2873        11951 :     if (rc != DBRES_OK) goto cleanup;
    2874              :     
    2875        11951 :     rc = databasevm_bind_int(vm, 3, col_version);
    2876        11951 :     if (rc != DBRES_OK) goto cleanup;
    2877              :     
    2878        11951 :     rc = databasevm_bind_int(vm, 4, db_version);
    2879        11951 :     if (rc != DBRES_OK) goto cleanup;
    2880              :     
    2881        11951 :     rc = databasevm_bind_int(vm, 5, seq);
    2882        11951 :     if (rc != DBRES_OK) goto cleanup;
    2883              :     
    2884        11951 :     rc = databasevm_bind_int(vm, 6, db_version);
    2885        11951 :     if (rc != DBRES_OK) goto cleanup;
    2886              :     
    2887        11951 :     rc = databasevm_bind_int(vm, 7, seq);
    2888        11951 :     if (rc != DBRES_OK) goto cleanup;
    2889              :     
    2890        11951 :     rc = databasevm_step(vm);
    2891        11951 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    2892              :     
    2893              : cleanup:
    2894        11951 :     DEBUG_DBERROR(rc, "local_insert_or_update", table->context);
    2895        11951 :     databasevm_reset(vm);
    2896        11951 :     return rc;
    2897        11951 : }
    2898              : 
    2899        11846 : int local_mark_insert_or_update_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const char *col_name, int64_t db_version, int seq) {
    2900        11846 :     return local_mark_insert_or_update_meta_impl(table, pk, pklen, col_name, 1, db_version, seq);
    2901              : }
    2902              : 
    2903           41 : int local_mark_delete_block_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const char *block_colname, int64_t db_version, int seq) {
    2904              :     // Mark a block as deleted by setting col_version = 2 (even = deleted)
    2905           41 :     return local_mark_insert_or_update_meta_impl(table, pk, pklen, block_colname, 2, db_version, seq);
    2906              : }
    2907              : 
    2908           41 : int block_delete_value_external (cloudsync_context *data, cloudsync_table_context *table, const void *pk, size_t pklen, const char *block_colname) {
    2909           41 :     return block_delete_value(data, table, pk, (int)pklen, block_colname);
    2910              : }
    2911              : 
    2912           64 : int local_mark_delete_meta (cloudsync_table_context *table, const void *pk, size_t pklen, int64_t db_version, int seq) {
    2913           64 :     return local_mark_insert_or_update_meta_impl(table, pk, pklen, NULL, 2, db_version, seq);
    2914              : }
    2915              : 
    2916           36 : int local_drop_meta (cloudsync_table_context *table, const void *pk, size_t pklen) {
    2917           36 :     dbvm_t *vm = table->meta_row_drop_stmt;
    2918           36 :     if (!vm) return -1;
    2919              :     
    2920           36 :     int rc = databasevm_bind_blob(vm, 1, pk, pklen);
    2921           36 :     if (rc != DBRES_OK) goto cleanup;
    2922              :     
    2923           36 :     rc = databasevm_step(vm);
    2924           36 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    2925              :     
    2926              : cleanup:
    2927           36 :     DEBUG_DBERROR(rc, "local_drop_meta", table->context);
    2928           36 :     databasevm_reset(vm);
    2929           36 :     return rc;
    2930           36 : }
    2931              : 
    2932           28 : int local_update_move_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const void *pk2, size_t pklen2, int64_t db_version) {
    2933              :     /*
    2934              :       * This function moves non-sentinel metadata entries from an old primary key (OLD.pk)
    2935              :       * to a new primary key (NEW.pk) when a primary key change occurs.
    2936              :       *
    2937              :       * To ensure consistency and proper conflict resolution in a CRDT (Conflict-free Replicated Data Type) system,
    2938              :       * each non-sentinel metadata entry involved in the move must have a unique sequence value (seq).
    2939              :       *
    2940              :       * The `seq` is crucial for tracking the order of operations and for detecting and resolving conflicts
    2941              :       * during synchronization between replicas. Without a unique `seq` for each entry, concurrent updates
    2942              :       * may be applied incorrectly, leading to data inconsistency.
    2943              :       *
    2944              :       * When performing the update, a unique `seq` must be assigned to each metadata row. This can be achieved
    2945              :       * by either incrementing the maximum sequence value in the table or using a function (e.g., cloudsync_bumpseq(data))
    2946              :       * that generates a unique sequence for each row. The update query should ensure that each row moved
    2947              :       * from OLD.pk to NEW.pk gets a distinct `seq` to maintain proper versioning and ordering of changes.
    2948              :      */
    2949              :     
    2950              :     // see https://github.com/sqliteai/sqlite-sync/blob/main/docs/PriKey.md for more details
    2951              :     // pk2 is the old pk
    2952              :     
    2953           28 :     dbvm_t *vm = table->meta_update_move_stmt;
    2954           28 :     if (!vm) return -1;
    2955              :     
    2956              :     // new primary key
    2957           28 :     int rc = databasevm_bind_blob(vm, 1, pk, pklen);
    2958           28 :     if (rc != DBRES_OK) goto cleanup;
    2959              :     
    2960              :     // new db_version
    2961           28 :     rc = databasevm_bind_int(vm, 2, db_version);
    2962           28 :     if (rc != DBRES_OK) goto cleanup;
    2963              :     
    2964              :     // old primary key
    2965           28 :     rc = databasevm_bind_blob(vm, 3, pk2, pklen2);
    2966           28 :     if (rc != DBRES_OK) goto cleanup;
    2967              :     
    2968           28 :     rc = databasevm_step(vm);
    2969           28 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    2970              :     
    2971              : cleanup:
    2972           28 :     DEBUG_DBERROR(rc, "local_update_move_meta", table->context);
    2973           28 :     databasevm_reset(vm);
    2974           28 :     return rc;
    2975           28 : }
    2976              : 
    2977              : // MARK: - Payload Encode / Decode -
    2978              : 
    2979          687 : static void cloudsync_payload_checksum_store (cloudsync_payload_header *header, uint64_t checksum) {
    2980          687 :     uint64_t h = checksum & 0xFFFFFFFFFFFFULL; // keep 48 bits
    2981          687 :     header->checksum[0] = (uint8_t)(h >> 40);
    2982          687 :     header->checksum[1] = (uint8_t)(h >> 32);
    2983          687 :     header->checksum[2] = (uint8_t)(h >> 24);
    2984          687 :     header->checksum[3] = (uint8_t)(h >> 16);
    2985          687 :     header->checksum[4] = (uint8_t)(h >>  8);
    2986          687 :     header->checksum[5] = (uint8_t)(h >>  0);
    2987          687 : }
    2988              : 
    2989          682 : static uint64_t cloudsync_payload_checksum_load (cloudsync_payload_header *header) {
    2990         2046 :     return ((uint64_t)header->checksum[0] << 40) |
    2991         1364 :            ((uint64_t)header->checksum[1] << 32) |
    2992         1364 :            ((uint64_t)header->checksum[2] << 24) |
    2993         1364 :            ((uint64_t)header->checksum[3] << 16) |
    2994         1364 :            ((uint64_t)header->checksum[4] <<  8) |
    2995          682 :            ((uint64_t)header->checksum[5] <<  0);
    2996              : }
    2997              : 
    2998          682 : static bool cloudsync_payload_checksum_verify (cloudsync_payload_header *header, uint64_t checksum) {
    2999          682 :     uint64_t checksum1 = cloudsync_payload_checksum_load(header);
    3000          682 :     uint64_t checksum2 = checksum & 0xFFFFFFFFFFFFULL;
    3001          682 :     return (checksum1 == checksum2);
    3002              : }
    3003              : 
    3004        49184 : static bool cloudsync_payload_encode_check (cloudsync_payload_context *payload, size_t needed) {
    3005        49184 :     if (payload->nrows == 0) needed += sizeof(cloudsync_payload_header);
    3006              :     
    3007              :     // alloc/resize buffer
    3008        49184 :     if (payload->bused + needed > payload->balloc) {
    3009          696 :         if (needed < CLOUDSYNC_PAYLOAD_MINBUF_SIZE) needed = CLOUDSYNC_PAYLOAD_MINBUF_SIZE;
    3010          696 :         size_t balloc = payload->balloc + needed;
    3011              :         
    3012          696 :         char *buffer = cloudsync_memory_realloc(payload->buffer, balloc);
    3013          696 :         if (!buffer) {
    3014            0 :             if (payload->buffer) cloudsync_memory_free(payload->buffer);
    3015            0 :             memset(payload, 0, sizeof(cloudsync_payload_context));
    3016            0 :             return false;
    3017              :         }
    3018              :         
    3019          696 :         payload->buffer = buffer;
    3020          696 :         payload->balloc = balloc;
    3021          696 :         if (payload->nrows == 0) payload->bused = sizeof(cloudsync_payload_header);
    3022          696 :     }
    3023              :     
    3024        49184 :     return true;
    3025        49184 : }
    3026              : 
    3027        50567 : size_t cloudsync_payload_context_size (size_t *header_size) {
    3028        50567 :     if (header_size) *header_size = sizeof(cloudsync_payload_header);
    3029        50567 :     return sizeof(cloudsync_payload_context);
    3030              : }
    3031              : 
    3032          687 : void cloudsync_payload_header_init (cloudsync_payload_header *header, uint32_t expanded_size, uint16_t ncols, uint32_t nrows, uint64_t hash) {
    3033          687 :     memset(header, 0, sizeof(cloudsync_payload_header));
    3034              :     assert(sizeof(cloudsync_payload_header)==32);
    3035              :     
    3036              :     int major, minor, patch;
    3037          687 :     sscanf(CLOUDSYNC_VERSION, "%d.%d.%d", &major, &minor, &patch);
    3038              :     
    3039          687 :     header->signature = htonl(CLOUDSYNC_PAYLOAD_SIGNATURE);
    3040          687 :     header->version = CLOUDSYNC_PAYLOAD_VERSION_2;
    3041          687 :     header->libversion[0] = (uint8_t)major;
    3042          687 :     header->libversion[1] = (uint8_t)minor;
    3043          687 :     header->libversion[2] = (uint8_t)patch;
    3044          687 :     header->expanded_size = htonl(expanded_size);
    3045          687 :     header->ncols = htons(ncols);
    3046          687 :     header->nrows = htonl(nrows);
    3047          687 :     header->schema_hash = htonll(hash);
    3048          687 : }
    3049              : 
    3050        49184 : int cloudsync_payload_encode_step (cloudsync_payload_context *payload, cloudsync_context *data, int argc, dbvalue_t **argv) {
    3051              :     DEBUG_FUNCTION("cloudsync_payload_encode_step");
    3052              :     // debug_values(argc, argv);
    3053              :     
    3054              :     // check if the step function is called for the first time
    3055        49184 :     if (payload->nrows == 0) payload->ncols = (uint16_t)argc;
    3056              :     
    3057        49184 :     size_t breq = pk_encode_size((dbvalue_t **)argv, argc, 0, data->skip_decode_idx);
    3058        49184 :     if (cloudsync_payload_encode_check(payload, breq) == false) {
    3059            0 :         return cloudsync_set_error(data, "Not enough memory to resize payload internal buffer", DBRES_NOMEM);
    3060              :     }
    3061              :     
    3062        49184 :     char *buffer = payload->buffer + payload->bused;
    3063        49184 :     size_t bsize = payload->balloc - payload->bused;
    3064        49184 :     char *p = pk_encode((dbvalue_t **)argv, argc, buffer, false, &bsize, data->skip_decode_idx);
    3065        49184 :     if (!p) return cloudsync_set_error(data, "An error occurred while encoding payload", DBRES_ERROR);
    3066              :     
    3067              :     // update buffer
    3068        49184 :     payload->bused += breq;
    3069              :     
    3070              :     // increment row counter
    3071        49184 :     ++payload->nrows;
    3072              :     
    3073        49184 :     return DBRES_OK;
    3074        49184 : }
    3075              : 
    3076          695 : int cloudsync_payload_encode_final (cloudsync_payload_context *payload, cloudsync_context *data) {
    3077              :     DEBUG_FUNCTION("cloudsync_payload_encode_final");
    3078              :     
    3079          695 :     if (payload->nrows == 0) {
    3080            8 :         if (payload->buffer) cloudsync_memory_free(payload->buffer);
    3081            8 :         payload->buffer = NULL;
    3082            8 :         payload->bsize = 0;
    3083            8 :         return DBRES_OK;
    3084              :     }
    3085              :     
    3086          687 :     if (payload->nrows > UINT32_MAX) {
    3087            0 :         if (payload->buffer) cloudsync_memory_free(payload->buffer);
    3088            0 :         payload->buffer = NULL;
    3089            0 :         payload->bsize = 0;
    3090            0 :         cloudsync_set_error(data, "Maximum number of payload rows reached", DBRES_ERROR);
    3091            0 :         return DBRES_ERROR;
    3092              :     }
    3093              :     
    3094              :     // sanity check about buffer size
    3095          687 :     int header_size = (int)sizeof(cloudsync_payload_header);
    3096          687 :     int64_t buffer_size = (int64_t)payload->bused - (int64_t)header_size;
    3097          687 :     if (buffer_size < 0) {
    3098            0 :         if (payload->buffer) cloudsync_memory_free(payload->buffer);
    3099            0 :         payload->buffer = NULL;
    3100            0 :         payload->bsize = 0;
    3101            0 :         cloudsync_set_error(data, "cloudsync_encode: internal size underflow", DBRES_ERROR);
    3102            0 :         return DBRES_ERROR;
    3103              :     }
    3104          687 :     if (buffer_size > INT_MAX) {
    3105            0 :         if (payload->buffer) cloudsync_memory_free(payload->buffer);
    3106            0 :         payload->buffer = NULL;
    3107            0 :         payload->bsize = 0;
    3108            0 :         cloudsync_set_error(data, "cloudsync_encode: payload too large to compress (INT_MAX limit)", DBRES_ERROR);
    3109            0 :         return DBRES_ERROR;
    3110              :     }
    3111              :     // try to allocate buffer used for compressed data
    3112          687 :     int real_buffer_size = (int)buffer_size;
    3113          687 :     int zbound = LZ4_compressBound(real_buffer_size);
    3114          687 :     char *zbuffer = cloudsync_memory_alloc(zbound + header_size); // if for some reasons allocation fails then just skip compression
    3115              :     
    3116              :     // skip the reserved header from the buffer to compress
    3117          687 :     char *src_buffer = payload->buffer + sizeof(cloudsync_payload_header);
    3118          687 :     int zused = (zbuffer) ? LZ4_compress_default(src_buffer, zbuffer+header_size, real_buffer_size, zbound) : 0;
    3119          687 :     bool use_uncompressed_buffer = (!zused || zused > real_buffer_size);
    3120          687 :     CHECK_FORCE_UNCOMPRESSED_BUFFER();
    3121              :     
    3122              :     // setup payload header
    3123          687 :     cloudsync_payload_header header = {0};
    3124          687 :     uint32_t expanded_size = (use_uncompressed_buffer) ? 0 : real_buffer_size;
    3125          687 :     cloudsync_payload_header_init(&header, expanded_size, payload->ncols, (uint32_t)payload->nrows, data->schema_hash);
    3126              :     
    3127              :     // if compression fails or if compressed size is bigger than original buffer, then use the uncompressed buffer
    3128          687 :     if (use_uncompressed_buffer) {
    3129           16 :         if (zbuffer) cloudsync_memory_free(zbuffer);
    3130           16 :         zbuffer = payload->buffer;
    3131           16 :         zused = real_buffer_size;
    3132           16 :     }
    3133              :     
    3134              :     // compute checksum of the buffer
    3135          687 :     uint64_t checksum = pk_checksum(zbuffer + header_size, zused);
    3136          687 :     cloudsync_payload_checksum_store(&header, checksum);
    3137              :     
    3138              :     // copy header and data to SQLite BLOB
    3139          687 :     memcpy(zbuffer, &header, sizeof(cloudsync_payload_header));
    3140          687 :     int blob_size = zused + sizeof(cloudsync_payload_header);
    3141          687 :     payload->bsize = blob_size;
    3142              :     
    3143              :     // cleanup memory
    3144          687 :     if (zbuffer != payload->buffer) {
    3145          671 :         cloudsync_memory_free (payload->buffer);
    3146          671 :         payload->buffer = zbuffer;
    3147          671 :     }
    3148              :     
    3149          687 :     return DBRES_OK;
    3150          695 : }
    3151              : 
    3152          695 : char *cloudsync_payload_blob (cloudsync_payload_context *payload, int64_t *blob_size, int64_t *nrows) {
    3153              :     DEBUG_FUNCTION("cloudsync_payload_blob");
    3154              :     
    3155          695 :     if (blob_size) *blob_size = (int64_t)payload->bsize;
    3156          695 :     if (nrows) *nrows = (int64_t)payload->nrows;
    3157          695 :     return payload->buffer;
    3158              : }
    3159              : 
    3160       441936 : static int cloudsync_payload_decode_callback (void *xdata, int index, int type, int64_t ival, double dval, char *pval) {
    3161       441936 :     cloudsync_pk_decode_bind_context *decode_context = (cloudsync_pk_decode_bind_context*)xdata;
    3162       441936 :     int rc = pk_decode_bind_callback(decode_context->vm, index, type, ival, dval, pval);
    3163              :     
    3164       441936 :     if (rc == DBRES_OK) {
    3165              :         // the dbversion index is smaller than seq index, so it is processed first
    3166              :         // when processing the dbversion column: save the value to the tmp_dbversion field
    3167              :         // when processing the seq column: update the dbversion and seq fields only if the current dbversion is greater than the last max value
    3168       441936 :         switch (index) {
    3169              :             case CLOUDSYNC_PK_INDEX_TBL:
    3170        49104 :                 if (type == DBTYPE_TEXT) {
    3171        49104 :                     decode_context->tbl = pval;
    3172        49104 :                     decode_context->tbl_len = ival;
    3173        49104 :                 }
    3174        49104 :                 break;
    3175              :             case CLOUDSYNC_PK_INDEX_PK:
    3176        49104 :                 if (type == DBTYPE_BLOB) {
    3177        49104 :                     decode_context->pk = pval;
    3178        49104 :                     decode_context->pk_len = ival;
    3179        49104 :                 }
    3180        49104 :                 break;
    3181              :             case CLOUDSYNC_PK_INDEX_COLNAME:
    3182        49104 :                 if (type == DBTYPE_TEXT) {
    3183        49104 :                     decode_context->col_name = pval;
    3184        49104 :                     decode_context->col_name_len = ival;
    3185        49104 :                 }
    3186        49104 :                 break;
    3187              :             case CLOUDSYNC_PK_INDEX_COLVERSION:
    3188        49104 :                 if (type == DBTYPE_INTEGER) decode_context->col_version = ival;
    3189        49104 :                 break;
    3190              :             case CLOUDSYNC_PK_INDEX_DBVERSION:
    3191        49104 :                 if (type == DBTYPE_INTEGER) decode_context->db_version = ival;
    3192        49104 :                 break;
    3193              :             case CLOUDSYNC_PK_INDEX_SITEID:
    3194        49104 :                 if (type == DBTYPE_BLOB) {
    3195        49104 :                     decode_context->site_id = pval;
    3196        49104 :                     decode_context->site_id_len = ival;
    3197        49104 :                 }
    3198        49104 :                 break;
    3199              :             case CLOUDSYNC_PK_INDEX_CL:
    3200        49104 :                 if (type == DBTYPE_INTEGER) decode_context->cl = ival;
    3201        49104 :                 break;
    3202              :             case CLOUDSYNC_PK_INDEX_SEQ:
    3203        49104 :                 if (type == DBTYPE_INTEGER) decode_context->seq = ival;
    3204        49104 :                 break;
    3205              :         }
    3206       441936 :     }
    3207              :         
    3208       441936 :     return rc;
    3209              : }
    3210              : 
    3211              : // #ifndef CLOUDSYNC_OMIT_RLS_VALIDATION
    3212              : 
    3213          685 : int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int blen, int *pnrows) {
    3214              :     // Guard against calling payload_apply before cloudsync_init: without this,
    3215              :     // the settings lookups at the top of this function would each emit a
    3216              :     // "no such table: cloudsync_settings" debug line, control would fall
    3217              :     // through to the meta-table insert, and the function would ultimately
    3218              :     // return an error with an empty errmsg — SQLite then surfaces that as
    3219              :     // the confusing "Runtime error: not an error".
    3220          685 :     if (!cloudsync_context_is_initialized(data)) {
    3221            1 :         return cloudsync_set_error(data,
    3222              :             "cloudsync is not initialized: call SELECT cloudsync_init('<table_name>') "
    3223              :             "to enable sync on a table before calling cloudsync_payload_apply().",
    3224              :             DBRES_MISUSE);
    3225              :     }
    3226              : 
    3227              :     // sanity check
    3228          684 :     if (blen < (int)sizeof(cloudsync_payload_header)) return cloudsync_set_error(data, "Error on cloudsync_payload_apply: invalid payload length", DBRES_MISUSE);
    3229              :     
    3230              :     // decode header
    3231              :     cloudsync_payload_header header;
    3232          684 :     memcpy(&header, payload, sizeof(cloudsync_payload_header));
    3233              :     
    3234          684 :     header.signature = ntohl(header.signature);
    3235          684 :     header.expanded_size = ntohl(header.expanded_size);
    3236          684 :     header.ncols = ntohs(header.ncols);
    3237          684 :     header.nrows = ntohl(header.nrows);
    3238          684 :     header.schema_hash = ntohll(header.schema_hash);
    3239              :     
    3240              :     // compare schema_hash only if not disabled and if the received payload was created with the current header version
    3241              :     // to avoid schema hash mismatch when processed by a peer with a different extension version during software updates.
    3242          684 :     if (dbutils_settings_get_int64_value(data, CLOUDSYNC_KEY_SKIP_SCHEMA_HASH_CHECK) == 0 && header.version == CLOUDSYNC_PAYLOAD_VERSION_LATEST ) {
    3243          684 :         if (header.schema_hash != data->schema_hash) {
    3244            4 :             if (!database_check_schema_hash(data, header.schema_hash)) {
    3245              :                 char buffer[1024];
    3246            2 :                 snprintf(buffer, sizeof(buffer), "Cannot apply the received payload because the schema hash is unknown %llu.", header.schema_hash);
    3247            2 :                 return cloudsync_set_error(data, buffer, DBRES_MISUSE);
    3248              :             }
    3249            2 :         }
    3250          682 :     }
    3251              :     
    3252              :     // sanity check header
    3253          682 :     if ((header.signature != CLOUDSYNC_PAYLOAD_SIGNATURE) || (header.ncols == 0)) {
    3254            0 :         return cloudsync_set_error(data, "Error on cloudsync_payload_apply: invalid signature or column size", DBRES_MISUSE);
    3255              :     }
    3256              :     
    3257          682 :     const char *buffer = payload + sizeof(cloudsync_payload_header);
    3258          682 :     size_t buf_len = (size_t)blen - sizeof(cloudsync_payload_header);
    3259              : 
    3260              :     // sanity check checksum (only if version is >= 2)
    3261          682 :     if (header.version >= CLOUDSYNC_PAYLOAD_MIN_VERSION_WITH_CHECKSUM) {
    3262          682 :         uint64_t checksum = pk_checksum(buffer, buf_len);
    3263          682 :         if (cloudsync_payload_checksum_verify(&header, checksum) == false) {
    3264            1 :             return cloudsync_set_error(data, "Error on cloudsync_payload_apply: invalid checksum", DBRES_MISUSE);
    3265              :         }
    3266          681 :     }
    3267              : 
    3268              :     // check if payload is compressed
    3269          681 :     char *clone = NULL;
    3270          681 :     if (header.expanded_size != 0) {
    3271          665 :         clone = (char *)cloudsync_memory_alloc(header.expanded_size);
    3272          665 :         if (!clone) return cloudsync_set_error(data, "Unable to allocate memory to uncompress payload", DBRES_NOMEM);
    3273              : 
    3274          665 :         int lz4_rc = LZ4_decompress_safe(buffer, clone, (int)buf_len, (int)header.expanded_size);
    3275          665 :         if (lz4_rc <= 0 || (uint32_t)lz4_rc != header.expanded_size) {
    3276            0 :             if (clone) cloudsync_memory_free(clone);
    3277            0 :             return cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to decompress BLOB", DBRES_MISUSE);
    3278              :         }
    3279              : 
    3280          665 :         buffer = (const char *)clone;
    3281          665 :         buf_len = (size_t)header.expanded_size;
    3282          665 :     }
    3283              :     
    3284              :     // precompile the insert statement
    3285          681 :     dbvm_t *vm = NULL;
    3286          681 :     int rc = databasevm_prepare(data, SQL_CHANGES_INSERT_ROW, &vm, 0);
    3287          681 :     if (rc != DBRES_OK) {
    3288            0 :         if (clone) cloudsync_memory_free(clone);
    3289            0 :         return cloudsync_set_error(data, "Error on cloudsync_payload_apply: error while compiling SQL statement", rc);
    3290              :     }
    3291              :     
    3292              :     // process buffer, one row at a time
    3293          681 :     uint16_t ncols = header.ncols;
    3294          681 :     uint32_t nrows = header.nrows;
    3295          681 :     int64_t last_payload_db_version = -1;
    3296          681 :     int dbversion = dbutils_settings_get_int_value(data, CLOUDSYNC_KEY_CHECK_DBVERSION);
    3297          681 :     int seq = dbutils_settings_get_int_value(data, CLOUDSYNC_KEY_CHECK_SEQ);
    3298          681 :     cloudsync_pk_decode_bind_context decoded_context = {.vm = vm};
    3299              : 
    3300              :     // Initialize deferred column-batch merge
    3301          681 :     merge_pending_batch batch = {0};
    3302          681 :     data->pending_batch = &batch;
    3303          681 :     bool in_savepoint = false;
    3304          681 :     const void *last_pk = NULL;
    3305          681 :     int64_t last_pk_len = 0;
    3306          681 :     const char *last_tbl = NULL;
    3307          681 :     int64_t last_tbl_len = 0;
    3308              : 
    3309        49785 :     for (uint32_t i=0; i<nrows; ++i) {
    3310        49104 :         size_t seek = 0;
    3311        49104 :         int res = pk_decode((char *)buffer, buf_len, ncols, &seek, data->skip_decode_idx, cloudsync_payload_decode_callback, &decoded_context);
    3312        49104 :         if (res == -1) {
    3313            0 :             merge_flush_pending(data);
    3314            0 :             data->pending_batch = NULL;
    3315            0 :             if (batch.cached_vm) { databasevm_finalize(batch.cached_vm); batch.cached_vm = NULL; }
    3316            0 :             if (batch.cached_col_names) { cloudsync_memory_free(batch.cached_col_names); batch.cached_col_names = NULL; }
    3317            0 :             if (batch.entries) { cloudsync_memory_free(batch.entries); batch.entries = NULL; }
    3318            0 :             if (in_savepoint) database_rollback_savepoint(data, "cloudsync_payload_apply");
    3319            0 :             rc = DBRES_ERROR;
    3320            0 :             goto cleanup;
    3321              :         }
    3322              : 
    3323              :         // Detect PK/table/db_version boundary to flush pending batch
    3324        97527 :         bool pk_changed = (last_pk != NULL &&
    3325        48423 :                            (last_pk_len != decoded_context.pk_len ||
    3326        46497 :                             memcmp(last_pk, decoded_context.pk, last_pk_len) != 0));
    3327        97527 :         bool tbl_changed = (last_tbl != NULL &&
    3328        48423 :                             (last_tbl_len != decoded_context.tbl_len ||
    3329        48351 :                              memcmp(last_tbl, decoded_context.tbl, last_tbl_len) != 0));
    3330        49104 :         bool db_version_changed = (last_payload_db_version != decoded_context.db_version);
    3331              : 
    3332              :         // Flush pending batch before any boundary change
    3333        49104 :         if (pk_changed || tbl_changed || db_version_changed) {
    3334        18227 :             int flush_rc = merge_flush_pending(data);
    3335        18227 :             if (flush_rc != DBRES_OK) {
    3336            1 :                 rc = flush_rc;
    3337              :                 // continue processing remaining rows
    3338            1 :             }
    3339        18227 :         }
    3340              : 
    3341              :         // Per-db_version savepoints group rows with the same source db_version
    3342              :         // into one transaction. In SQLite autocommit mode, the RELEASE triggers
    3343              :         // the commit hook which bumps data->db_version and resets seq, ensuring
    3344              :         // unique (db_version, seq) tuples across groups. In PostgreSQL SPI,
    3345              :         // database_in_transaction() is always true so this block is inactive —
    3346              :         // the inner per-PK savepoint in merge_flush_pending handles RLS instead.
    3347        49104 :         if (in_savepoint && db_version_changed) {
    3348         4290 :             rc = database_commit_savepoint(data, "cloudsync_payload_apply");
    3349         4290 :             if (rc != DBRES_OK) {
    3350            0 :                 merge_pending_free_entries(&batch);
    3351            0 :                 data->pending_batch = NULL;
    3352            0 :                 cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to release a savepoint", rc);
    3353            0 :                 goto cleanup;
    3354              :             }
    3355         4290 :             in_savepoint = false;
    3356         4290 :         }
    3357              : 
    3358        49104 :         if (!in_savepoint && db_version_changed && !database_in_transaction(data)) {
    3359         4971 :             rc = database_begin_savepoint(data, "cloudsync_payload_apply");
    3360         4971 :             if (rc != DBRES_OK) {
    3361            0 :                 merge_pending_free_entries(&batch);
    3362            0 :                 data->pending_batch = NULL;
    3363            0 :                 cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to start a transaction", rc);
    3364            0 :                 goto cleanup;
    3365              :             }
    3366         4971 :             in_savepoint = true;
    3367         4971 :         }
    3368              : 
    3369              :         // Track db_version for batch-flush boundary detection
    3370        49104 :         if (db_version_changed) {
    3371         4971 :             last_payload_db_version = decoded_context.db_version;
    3372         4971 :         }
    3373              : 
    3374              :         // Update PK/table tracking
    3375        49104 :         last_pk = decoded_context.pk;
    3376        49104 :         last_pk_len = decoded_context.pk_len;
    3377        49104 :         last_tbl = decoded_context.tbl;
    3378        49104 :         last_tbl_len = decoded_context.tbl_len;
    3379              : 
    3380        49104 :         rc = databasevm_step(vm);
    3381        49104 :         if (rc != DBRES_DONE) {
    3382              :             // don't "break;", the error can be due to a RLS policy.
    3383              :             // in case of error we try to apply the following changes
    3384            2 :         }
    3385              : 
    3386        49104 :         buffer += seek;
    3387        49104 :         buf_len -= seek;
    3388        49104 :         dbvm_reset(vm);
    3389        49104 :     }
    3390              : 
    3391              :     // Final flush after loop
    3392              :     {
    3393          681 :         int flush_rc = merge_flush_pending(data);
    3394          681 :         if (flush_rc != DBRES_OK && rc == DBRES_OK) rc = flush_rc;
    3395              :     }
    3396          681 :     data->pending_batch = NULL;
    3397              : 
    3398          681 :     if (in_savepoint) {
    3399          681 :         int rc1 = database_commit_savepoint(data, "cloudsync_payload_apply");
    3400          681 :         if (rc1 != DBRES_OK) rc = rc1;
    3401          681 :     }
    3402              : 
    3403              :     // save last error (unused if function returns OK)
    3404          681 :     if (rc != DBRES_OK && rc != DBRES_DONE) {
    3405            1 :         cloudsync_set_dberror(data);
    3406            1 :     }
    3407              : 
    3408          681 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    3409         1361 :     if (rc == DBRES_OK) {
    3410              :         char buf[256];
    3411          680 :         if (decoded_context.db_version >= dbversion) {
    3412          546 :             snprintf(buf, sizeof(buf), "%" PRId64, decoded_context.db_version);
    3413          546 :             dbutils_settings_set_key_value(data, CLOUDSYNC_KEY_CHECK_DBVERSION, buf);
    3414              :             
    3415          546 :             if (decoded_context.seq != seq) {
    3416          338 :                 snprintf(buf, sizeof(buf), "%" PRId64, decoded_context.seq);
    3417          338 :                 dbutils_settings_set_key_value(data, CLOUDSYNC_KEY_CHECK_SEQ, buf);
    3418          338 :             }
    3419          546 :         }
    3420          680 :     }
    3421              : 
    3422              : cleanup:
    3423              :     // cleanup merge_pending_batch
    3424          681 :     if (batch.cached_vm) { databasevm_finalize(batch.cached_vm); batch.cached_vm = NULL; }
    3425          681 :     if (batch.cached_col_names) { cloudsync_memory_free(batch.cached_col_names); batch.cached_col_names = NULL; }
    3426          681 :     if (batch.entries) { cloudsync_memory_free(batch.entries); batch.entries = NULL; }  
    3427              : 
    3428              :     // cleanup vm
    3429          681 :     if (vm) databasevm_finalize(vm);
    3430              : 
    3431              :     // cleanup memory
    3432          681 :     if (clone) cloudsync_memory_free(clone);
    3433              : 
    3434              :     // error already saved in (save last error)
    3435          681 :     if (rc != DBRES_OK) return rc;
    3436              : 
    3437              :     // return the number of processed rows
    3438          680 :     if (pnrows) *pnrows = nrows;
    3439          680 :     return DBRES_OK;
    3440          685 : }
    3441              : 
    3442              : // MARK: - Payload load/store -
    3443              : 
    3444            0 : int cloudsync_payload_get (cloudsync_context *data, char **blob, int *blob_size, int *db_version, int64_t *new_db_version) {
    3445              :     // retrieve current db_version and seq
    3446            0 :     *db_version = dbutils_settings_get_int_value(data, CLOUDSYNC_KEY_SEND_DBVERSION);
    3447            0 :     if (*db_version < 0) return DBRES_ERROR;
    3448              :     
    3449              :     // retrieve BLOB
    3450              :     char sql[1024];
    3451            0 :     snprintf(sql, sizeof(sql), "WITH max_db_version AS (SELECT MAX(db_version) AS max_db_version FROM cloudsync_changes WHERE site_id=cloudsync_siteid()) "
    3452              :                                "SELECT * FROM (SELECT cloudsync_payload_encode(tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq) AS payload, max_db_version AS max_db_version FROM cloudsync_changes, max_db_version WHERE site_id=cloudsync_siteid() AND db_version>%d) WHERE payload IS NOT NULL", *db_version);
    3453              :     
    3454            0 :     int64_t len = 0;
    3455            0 :     int rc = database_select_blob_int(data, sql, blob, &len, new_db_version);
    3456            0 :     *blob_size = (int)len;
    3457            0 :     if (rc != DBRES_OK) return rc;
    3458              :     
    3459              :     // exit if there is no data to send
    3460            0 :     if (*blob == NULL || *blob_size == 0) return DBRES_OK;
    3461            0 :     return rc;
    3462            0 : }
    3463              : 
    3464              : #ifdef CLOUDSYNC_DESKTOP_OS
    3465            0 : int cloudsync_payload_save (cloudsync_context *data, const char *payload_path, int *size) {
    3466              :     DEBUG_FUNCTION("cloudsync_payload_save");
    3467              :     
    3468              :     // silently delete any other payload with the same name
    3469            0 :     cloudsync_file_delete(payload_path);
    3470              :     
    3471              :     // retrieve payload
    3472            0 :     char *blob = NULL;
    3473            0 :     int blob_size = 0, db_version = 0;
    3474            0 :     int64_t new_db_version = 0;
    3475            0 :     int rc = cloudsync_payload_get(data, &blob, &blob_size, &db_version, &new_db_version);
    3476            0 :     if (rc != DBRES_OK) {
    3477            0 :         if (db_version < 0) return cloudsync_set_error(data, "Unable to retrieve db_version", rc);
    3478            0 :         return cloudsync_set_error(data, "Unable to retrieve changes in cloudsync_payload_save", rc);
    3479              :     }
    3480              :     
    3481              :     // exit if there is no data to save
    3482            0 :     if (blob == NULL || blob_size == 0) {
    3483            0 :         if (size) *size = 0;
    3484            0 :         return DBRES_OK;
    3485              :     }
    3486              :     
    3487              :     // write payload to file
    3488            0 :     bool res = cloudsync_file_write(payload_path, blob, (size_t)blob_size);
    3489            0 :     cloudsync_memory_free(blob);
    3490            0 :     if (res == false) {
    3491            0 :         return cloudsync_set_error(data, "Unable to write payload to file path", DBRES_IOERR);
    3492              :     }
    3493              :     
    3494              :     // returns blob size
    3495            0 :     if (size) *size = blob_size;
    3496            0 :     return DBRES_OK;
    3497            0 : }
    3498              : #endif
    3499              : 
    3500              : // MARK: - Core -
    3501              : 
    3502          294 : int cloudsync_table_sanity_check (cloudsync_context *data, const char *name, CLOUDSYNC_INIT_FLAG init_flags) {
    3503              :     DEBUG_DBFUNCTION("cloudsync_table_sanity_check %s", name);
    3504              :     char buffer[2048];
    3505              :     
    3506              :     // sanity check table name
    3507          294 :     if (name == NULL) {
    3508            1 :         return cloudsync_set_error(data, "cloudsync_init requires a non-null table parameter", DBRES_ERROR);
    3509              :     }
    3510              :     
    3511              :     // avoid allocating heap memory for SQL statements by setting a maximum length of 512 characters
    3512              :     // for table names. This limit is reasonable and helps prevent memory management issues.
    3513          293 :     const size_t maxlen = CLOUDSYNC_MAX_TABLENAME_LEN;
    3514          293 :     if (strlen(name) > maxlen) {
    3515            1 :         snprintf(buffer, sizeof(buffer), "Table name cannot be longer than %d characters", (int)maxlen);
    3516            1 :         return cloudsync_set_error(data, buffer, DBRES_ERROR);
    3517              :     }
    3518              :     
    3519              :     // check if already initialized
    3520          292 :     cloudsync_table_context *table = table_lookup(data, name);
    3521          292 :     if (table) return DBRES_OK;
    3522              :     
    3523              :     // check if table exists
    3524          288 :     if (database_table_exists(data, name, cloudsync_schema(data)) == false) {
    3525            2 :         snprintf(buffer, sizeof(buffer), "Table %s does not exist", name);
    3526            2 :         return cloudsync_set_error(data, buffer, DBRES_ERROR);
    3527              :     }
    3528              :     
    3529              :     // no more than 128 columns can be used as a composite primary key (SQLite hard limit)
    3530          286 :     int npri_keys = database_count_pk(data, name, false, cloudsync_schema(data));
    3531          286 :     if (npri_keys < 0) return cloudsync_set_dberror(data);
    3532          286 :     if (npri_keys > 128) return cloudsync_set_error(data, "No more than 128 columns can be used to form a composite primary key", DBRES_ERROR);
    3533              :     
    3534              :     #if CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
    3535              :     // if count == 0 means that rowid will be used as primary key (BTW: very bad choice for the user)
    3536          286 :     if (npri_keys == 0) {
    3537            1 :         snprintf(buffer, sizeof(buffer), "Rowid only tables are not supported, all primary keys must be explicitly set and declared as NOT NULL (table %s)", name);
    3538            1 :         return cloudsync_set_error(data, buffer, DBRES_ERROR);
    3539              :     }
    3540              :     #endif
    3541              :     
    3542          285 :     bool skip_int_pk_check = (init_flags & CLOUDSYNC_INIT_FLAG_SKIP_INT_PK_CHECK) != 0;
    3543          285 :     if (!skip_int_pk_check) {
    3544          222 :         if (npri_keys == 1) {
    3545              :             // the affinity of a column is determined by the declared type of the column,
    3546              :             // according to the following rules in the order shown:
    3547              :             // 1. If the declared type contains the string "INT" then it is assigned INTEGER affinity.
    3548          134 :             int npri_keys_int = database_count_int_pk(data, name, cloudsync_schema(data));
    3549          134 :             if (npri_keys_int < 0) return cloudsync_set_dberror(data);
    3550          134 :             if (npri_keys == npri_keys_int) {
    3551            1 :                 snprintf(buffer, sizeof(buffer), "Table %s uses a single-column INTEGER primary key. For CRDT replication, primary keys must be globally unique. Consider using a TEXT primary key with UUIDs or ULID to avoid conflicts across nodes. If you understand the risk and still want to use this INTEGER primary key, set the third argument of the cloudsync_init function to 1 to skip this check.", name);
    3552            1 :                 return cloudsync_set_error(data, buffer, DBRES_ERROR);
    3553              :             }
    3554              :             
    3555          133 :         }
    3556          221 :     }
    3557              :         
    3558              :     // if user declared explicit primary key(s) then make sure they are all declared as NOT NULL
    3559              :     #if CLOUDSYNC_CHECK_NOTNULL_PRIKEYS
    3560              :     bool skip_notnull_prikeys_check = (init_flags & CLOUDSYNC_INIT_FLAG_SKIP_NOT_NULL_PRIKEYS_CHECK) != 0;
    3561              :     if (!skip_notnull_prikeys_check) {
    3562              :         if (npri_keys > 0) {
    3563              :             int npri_keys_notnull = database_count_pk(data, name, true, cloudsync_schema(data));
    3564              :             if (npri_keys_notnull < 0) return cloudsync_set_dberror(data);
    3565              :             if (npri_keys != npri_keys_notnull) {
    3566              :                 snprintf(buffer, sizeof(buffer), "All primary keys must be explicitly declared as NOT NULL (table %s)", name);
    3567              :                 return cloudsync_set_error(data, buffer, DBRES_ERROR);
    3568              :             }
    3569              :         }
    3570              :     }
    3571              :     #endif
    3572              :     
    3573              :     // check for columns declared as NOT NULL without a DEFAULT value.
    3574              :     // Otherwise, col_merge_stmt would fail if changes to other columns are inserted first.
    3575          284 :     bool skip_notnull_default_check = (init_flags & CLOUDSYNC_INIT_FLAG_SKIP_NOT_NULL_DEFAULT_CHECK) != 0;
    3576          284 :     if (!skip_notnull_default_check) {
    3577          284 :         int n_notnull_nodefault = database_count_notnull_without_default(data, name, cloudsync_schema(data));
    3578          284 :         if (n_notnull_nodefault < 0) return cloudsync_set_dberror(data);
    3579          284 :         if (n_notnull_nodefault > 0) {
    3580            0 :             snprintf(buffer, sizeof(buffer), "All non-primary key columns declared as NOT NULL must have a DEFAULT value. (table %s)", name);
    3581            0 :             return cloudsync_set_error(data, buffer, DBRES_ERROR);
    3582              :         }
    3583          284 :     }
    3584              :     
    3585          284 :     return DBRES_OK;
    3586          294 : }
    3587              : 
    3588            4 : int cloudsync_cleanup_internal (cloudsync_context *data, cloudsync_table_context *table) {
    3589            4 :     if (cloudsync_context_init(data) == NULL) return DBRES_MISUSE;
    3590              : 
    3591              :     // drop meta-table
    3592            4 :     const char *table_name = table->name;
    3593            4 :     char *sql = cloudsync_memory_mprintf(SQL_DROP_CLOUDSYNC_TABLE, table->meta_ref);
    3594            4 :     int rc = database_exec(data, sql);
    3595            4 :     cloudsync_memory_free(sql);
    3596            4 :     if (rc != DBRES_OK) {
    3597              :         char buffer[1024];
    3598            0 :         snprintf(buffer, sizeof(buffer), "Unable to drop cloudsync table %s_cloudsync in cloudsync_cleanup", table_name);
    3599            0 :         return cloudsync_set_error(data, buffer, rc);
    3600              :     }
    3601              : 
    3602              :     // drop blocks table if this table has block LWW columns
    3603            4 :     if (table->blocks_ref) {
    3604            1 :         sql = cloudsync_memory_mprintf(SQL_DROP_CLOUDSYNC_TABLE, table->blocks_ref);
    3605            1 :         rc = database_exec(data, sql);
    3606            1 :         cloudsync_memory_free(sql);
    3607            1 :         if (rc != DBRES_OK) {
    3608              :             char buffer[1024];
    3609            0 :             snprintf(buffer, sizeof(buffer), "Unable to drop blocks table %s_cloudsync_blocks in cloudsync_cleanup", table_name);
    3610            0 :             return cloudsync_set_error(data, buffer, rc);
    3611              :         }
    3612            1 :     }
    3613              : 
    3614              :     // drop original triggers
    3615            4 :     rc = database_delete_triggers(data, table_name);
    3616            4 :     if (rc != DBRES_OK) {
    3617              :         char buffer[1024];
    3618            0 :         snprintf(buffer, sizeof(buffer), "Unable to delete triggers for table %s", table_name);
    3619            0 :         return cloudsync_set_error(data, buffer, rc);
    3620              :     }
    3621              :     
    3622              :     // remove all table related settings
    3623            4 :     dbutils_table_settings_set_key_value(data, table_name, NULL, NULL, NULL);
    3624            4 :     return DBRES_OK;
    3625            4 : }
    3626              : 
    3627            4 : int cloudsync_cleanup (cloudsync_context *data, const char *table_name) {
    3628            4 :     cloudsync_table_context *table = table_lookup(data, table_name);
    3629            4 :     if (!table) return DBRES_OK;
    3630              :     
    3631              :     // TODO: check what happen if cloudsync_cleanup_internal failes (not eveything dropped) and the table is still in memory?
    3632              :     
    3633            4 :     int rc = cloudsync_cleanup_internal(data, table);
    3634            4 :     if (rc != DBRES_OK) return rc;
    3635              :     
    3636            4 :     int counter = table_remove(data, table);
    3637            4 :     table_free(table);
    3638              :     
    3639            4 :     if (counter == 0) {
    3640              :         // cleanup database on last table
    3641            2 :         cloudsync_reset_siteid(data);
    3642            2 :         dbutils_settings_cleanup(data);
    3643            2 :     } else {
    3644            2 :         if (database_internal_table_exists(data, CLOUDSYNC_TABLE_SETTINGS_NAME) == true) {
    3645            2 :             cloudsync_update_schema_hash(data);
    3646            2 :         }
    3647              :     }
    3648              :     
    3649            4 :     return DBRES_OK;
    3650            4 : }
    3651              : 
    3652            0 : int cloudsync_cleanup_all (cloudsync_context *data) {
    3653            0 :     return database_cleanup(data);
    3654              : }
    3655              : 
    3656          484 : int cloudsync_terminate (cloudsync_context *data) {
    3657              :     // can't use for/loop here because data->tables_count is changed by table_remove
    3658          740 :     while (data->tables_count > 0) {
    3659          256 :         cloudsync_table_context *t = data->tables[data->tables_count - 1];
    3660          256 :         table_remove(data, t);
    3661          256 :         table_free(t);
    3662              :     }
    3663              :     
    3664          484 :     if (data->schema_version_stmt) databasevm_finalize(data->schema_version_stmt);
    3665          484 :     if (data->data_version_stmt) databasevm_finalize(data->data_version_stmt);
    3666          484 :     if (data->db_version_stmt) databasevm_finalize(data->db_version_stmt);
    3667          484 :     if (data->getset_siteid_stmt) databasevm_finalize(data->getset_siteid_stmt);
    3668          484 :     if (data->current_schema) cloudsync_memory_free(data->current_schema);
    3669              :     
    3670          484 :     data->schema_version_stmt = NULL;
    3671          484 :     data->data_version_stmt = NULL;
    3672          484 :     data->db_version_stmt = NULL;
    3673          484 :     data->getset_siteid_stmt = NULL;
    3674          484 :     data->current_schema = NULL;
    3675              :     
    3676              :     // reset the site_id so the cloudsync_context_init will be executed again
    3677              :     // if any other cloudsync function is called after terminate
    3678          484 :     data->site_id[0] = 0;
    3679              :     
    3680          484 :     return 1;
    3681              : }
    3682              : 
    3683          289 : int cloudsync_init_table (cloudsync_context *data, const char *table_name, const char *algo_name, CLOUDSYNC_INIT_FLAG init_flags) {
    3684              :     // sanity check table and its primary key(s)
    3685          289 :     int rc = cloudsync_table_sanity_check(data, table_name, init_flags);
    3686          289 :     if (rc != DBRES_OK) return rc;
    3687              :     
    3688              :     // init cloudsync_settings
    3689          287 :     if (cloudsync_context_init(data) == NULL) {
    3690            0 :         return cloudsync_set_error(data, "Unable to initialize cloudsync context", DBRES_MISUSE);
    3691              :     }
    3692              :     
    3693              :     // sanity check algo name (if exists)
    3694          287 :     table_algo algo_new = table_algo_none;
    3695          287 :     if (!algo_name) algo_name = CLOUDSYNC_DEFAULT_ALGO;
    3696              :     
    3697          287 :     algo_new = cloudsync_algo_from_name(algo_name);
    3698          287 :     if (algo_new == table_algo_none) {
    3699              :         char buffer[1024];
    3700            1 :         snprintf(buffer, sizeof(buffer), "Unknown CRDT algorithm name %s", algo_name);
    3701            1 :         return cloudsync_set_error(data, buffer, DBRES_ERROR);
    3702              :     }
    3703              : 
    3704              :     // DWS and AWS algorithms are not yet implemented in the merge logic
    3705          286 :     if (algo_new == table_algo_crdt_dws || algo_new == table_algo_crdt_aws) {
    3706              :         char buffer[1024];
    3707            2 :         snprintf(buffer, sizeof(buffer), "CRDT algorithm %s is not yet supported", algo_name);
    3708            2 :         return cloudsync_set_error(data, buffer, DBRES_ERROR);
    3709              :     }
    3710              :     
    3711              :     // check if table name was already augmented
    3712          284 :     table_algo algo_current = dbutils_table_settings_get_algo(data, table_name);
    3713              :     
    3714              :     // sanity check algorithm
    3715          284 :     if ((algo_new == algo_current) && (algo_current != table_algo_none)) {
    3716              :         // if table algorithms and the same and not none, do nothing
    3717          284 :     } else if ((algo_new == table_algo_none) && (algo_current == table_algo_none)) {
    3718              :         // nothing is written into settings because the default table_algo_crdt_cls will be used
    3719            0 :         algo_new = algo_current = table_algo_crdt_cls;
    3720          256 :     } else if ((algo_new == table_algo_none) && (algo_current != table_algo_none)) {
    3721              :         // algo is already written into settins so just use it
    3722            0 :         algo_new = algo_current;
    3723          256 :     } else if ((algo_new != table_algo_none) && (algo_current == table_algo_none)) {
    3724              :         // write table algo name in settings
    3725          256 :         dbutils_table_settings_set_key_value(data, table_name, "*", "algo", algo_name);
    3726          256 :     } else {
    3727              :         // error condition
    3728            0 :         return cloudsync_set_error(data, "The function cloudsync_cleanup(table) must be called before changing a table algorithm", DBRES_MISUSE);
    3729              :     }
    3730              :     
    3731              :     // Run the following function even if table was already augmented.
    3732              :     // It is safe to call the following function multiple times, if there is nothing to update nothing will be changed.
    3733              :     // After an alter table, in contrast, all the cloudsync triggers, tables and stmts must be recreated.
    3734              :     
    3735              :     // sync algo with table (unused in this version)
    3736              :     // cloudsync_sync_table_key(data, table_name, "*", CLOUDSYNC_KEY_ALGO, crdt_algo_name(algo_new));
    3737              :     
    3738              :     // read row-level filter from settings (if any)
    3739              :     char init_filter_buf[2048];
    3740          284 :     int init_frc = dbutils_table_settings_get_value(data, table_name, "*", "filter", init_filter_buf, sizeof(init_filter_buf));
    3741          284 :     const char *init_filter = (init_frc == DBRES_OK && init_filter_buf[0]) ? init_filter_buf : NULL;
    3742              : 
    3743              :     // check triggers
    3744          284 :     rc = database_create_triggers(data, table_name, algo_new, init_filter);
    3745          284 :     if (rc != DBRES_OK) return cloudsync_set_error(data, "An error occurred while creating triggers", DBRES_MISUSE);
    3746              :     
    3747              :     // check meta-table
    3748          284 :     rc = database_create_metatable(data, table_name);
    3749          284 :     if (rc != DBRES_OK) return cloudsync_set_error(data, "An error occurred while creating metatable", DBRES_MISUSE);
    3750              :     
    3751              :     // add prepared statements
    3752          284 :     if (cloudsync_add_dbvms(data) != DBRES_OK) {
    3753            0 :         return cloudsync_set_error(data, "An error occurred while trying to compile prepared SQL statements", DBRES_MISUSE);
    3754              :     }
    3755              :     
    3756              :     // add table to in-memory data context
    3757          284 :     if (table_add_to_context(data, algo_new, table_name) == false) {
    3758              :         char buffer[1024];
    3759            0 :         snprintf(buffer, sizeof(buffer), "An error occurred while adding %s table information to global context", table_name);
    3760            0 :         return cloudsync_set_error(data, buffer, DBRES_MISUSE);
    3761              :     }
    3762              :     
    3763          284 :     if (cloudsync_refill_metatable(data, table_name) != DBRES_OK) {
    3764            0 :         return cloudsync_set_error(data, "An error occurred while trying to fill the augmented table", DBRES_MISUSE);
    3765              :     }
    3766              :         
    3767          284 :     return DBRES_OK;
    3768          289 : }
        

Generated by: LCOV version 2.4-0