LCOV - code coverage report
Current view: top level - src - cloudsync.c (source / functions) Coverage Total Hit
Test: coverage.info Lines: 85.2 % 1986 1693
Test Date: 2026-03-16 13:50:46 Functions: 91.2 % 125 114

            Line data    Source code
       1              : //
       2              : //  cloudsync.c
       3              : //  cloudsync
       4              : //
       5              : //  Created by Marco Bambini on 16/05/24.
       6              : //
       7              : 
       8              : #include <inttypes.h>
       9              : #include <stdbool.h>
      10              : #include <limits.h>
      11              : #include <stdint.h>
      12              : #include <stdlib.h>
      13              : #include <string.h>
      14              : #include <assert.h>
      15              : #include <stdio.h>
      16              : #include <errno.h>
      17              : #include <math.h>
      18              : 
      19              : #include "cloudsync.h"
      20              : #include "lz4.h"
      21              : #include "pk.h"
      22              : #include "sql.h"
      23              : #include "utils.h"
      24              : #include "dbutils.h"
      25              : #include "block.h"
      26              : 
      27              : #ifdef _WIN32
      28              : #include <winsock2.h>
      29              : #include <ws2tcpip.h>
      30              : #else
      31              : #include <arpa/inet.h>                          // for htonl, htons, ntohl, ntohs
      32              : #include <netinet/in.h>                         // for struct sockaddr_in, INADDR_ANY, etc. (if needed)
      33              : #endif
      34              : 
      35              : #ifndef htonll
      36              : #if __BIG_ENDIAN__
      37              : #define htonll(x)                               (x)
      38              : #define ntohll(x)                               (x)
      39              : #else
      40              : #ifndef htobe64
      41              : #define htonll(x)                               ((uint64_t)htonl((x) & 0xFFFFFFFF) << 32 | (uint64_t)htonl((x) >> 32))
      42              : #define ntohll(x)                               ((uint64_t)ntohl((x) & 0xFFFFFFFF) << 32 | (uint64_t)ntohl((x) >> 32))
      43              : #else
      44              : #define htonll(x)                               htobe64(x)
      45              : #define ntohll(x)                               be64toh(x)
      46              : #endif
      47              : #endif
      48              : #endif
      49              : 
      50              : #define CLOUDSYNC_INIT_NTABLES                  64
      51              : #define CLOUDSYNC_MIN_DB_VERSION                0
      52              : 
      53              : #define CLOUDSYNC_PAYLOAD_MINBUF_SIZE                   (512*1024)
      54              : #define CLOUDSYNC_PAYLOAD_SIGNATURE                     0x434C5359  /* 'C','L','S','Y' */
      55              : #define CLOUDSYNC_PAYLOAD_VERSION_ORIGNAL               1
      56              : #define CLOUDSYNC_PAYLOAD_VERSION_1                     CLOUDSYNC_PAYLOAD_VERSION_ORIGNAL
      57              : #define CLOUDSYNC_PAYLOAD_VERSION_2                     2
      58              : #define CLOUDSYNC_PAYLOAD_VERSION_LATEST                CLOUDSYNC_PAYLOAD_VERSION_2
      59              : #define CLOUDSYNC_PAYLOAD_MIN_VERSION_WITH_CHECKSUM     CLOUDSYNC_PAYLOAD_VERSION_2
      60              : 
      61              : #ifndef MAX
      62              : #define MAX(a, b)                               (((a)>(b))?(a):(b))
      63              : #endif
      64              : 
      65              : #define DEBUG_DBERROR(_rc, _fn, _data)   do {if (_rc != DBRES_OK) printf("Error in %s: %s\n", _fn, database_errmsg(_data));} while (0)
      66              : 
      67              : typedef enum {
      68              :     CLOUDSYNC_PK_INDEX_TBL          = 0,
      69              :     CLOUDSYNC_PK_INDEX_PK           = 1,
      70              :     CLOUDSYNC_PK_INDEX_COLNAME      = 2,
      71              :     CLOUDSYNC_PK_INDEX_COLVALUE     = 3,
      72              :     CLOUDSYNC_PK_INDEX_COLVERSION   = 4,
      73              :     CLOUDSYNC_PK_INDEX_DBVERSION    = 5,
      74              :     CLOUDSYNC_PK_INDEX_SITEID       = 6,
      75              :     CLOUDSYNC_PK_INDEX_CL           = 7,
      76              :     CLOUDSYNC_PK_INDEX_SEQ          = 8
      77              : } CLOUDSYNC_PK_INDEX;
      78              : 
      79              : typedef enum {
      80              :     DBVM_VALUE_ERROR      = -1,
      81              :     DBVM_VALUE_UNCHANGED  = 0,
      82              :     DBVM_VALUE_CHANGED    = 1,
      83              : } DBVM_VALUE;
      84              : 
      85              : #define SYNCBIT_SET(_data)                  _data->insync = 1
      86              : #define SYNCBIT_RESET(_data)                _data->insync = 0
      87              : 
      88              : // MARK: - Deferred column-batch merge -
      89              : 
      90              : typedef struct {
      91              :     const char  *col_name;       // pointer into table_context->col_name[idx] (stable)
      92              :     dbvalue_t   *col_value;      // duplicated via database_value_dup (owned)
      93              :     int64_t     col_version;
      94              :     int64_t     db_version;
      95              :     uint8_t     site_id[UUID_LEN];
      96              :     int         site_id_len;
      97              :     int64_t     seq;
      98              : } merge_pending_entry;
      99              : 
     100              : typedef struct {
     101              :     cloudsync_table_context *table;
     102              :     char        *pk;             // malloc'd copy, freed on flush
     103              :     int         pk_len;
     104              :     int64_t     cl;
     105              :     bool        sentinel_pending;
     106              :     bool        row_exists;       // true when the PK already exists locally
     107              :     int         count;
     108              :     int         capacity;
     109              :     merge_pending_entry *entries;
     110              : 
     111              :     // Statement cache — reuse the prepared statement when the column
     112              :     // combination and row_exists flag match between consecutive PK flushes.
     113              :     dbvm_t      *cached_vm;
     114              :     bool        cached_row_exists;
     115              :     int         cached_col_count;
     116              :     const char  **cached_col_names; // array of pointers into table_context (not owned)
     117              : } merge_pending_batch;
     118              : 
     119              : // MARK: -
     120              : 
     121              : struct cloudsync_pk_decode_bind_context {
     122              :     dbvm_t      *vm;
     123              :     char        *tbl;
     124              :     int64_t     tbl_len;
     125              :     const void  *pk;
     126              :     int64_t     pk_len;
     127              :     char        *col_name;
     128              :     int64_t     col_name_len;
     129              :     int64_t     col_version;
     130              :     int64_t     db_version;
     131              :     const void  *site_id;
     132              :     int64_t     site_id_len;
     133              :     int64_t     cl;
     134              :     int64_t     seq;
     135              : };
     136              : 
     137              : struct cloudsync_context {
     138              :     void        *db;
     139              :     char        errmsg[1024];
     140              :     int         errcode;
     141              :     
     142              :     char        *libversion;
     143              :     uint8_t     site_id[UUID_LEN];
     144              :     int         insync;
     145              :     int         debug;
     146              :     bool        merge_equal_values;
     147              :     void        *aux_data;
     148              :     
     149              :     // stmts and context values
     150              :     dbvm_t      *schema_version_stmt;
     151              :     dbvm_t      *data_version_stmt;
     152              :     dbvm_t      *db_version_stmt;
     153              :     dbvm_t      *getset_siteid_stmt;
     154              :     int         data_version;
     155              :     int         schema_version;
     156              :     uint64_t    schema_hash;
     157              :     
     158              :     // set at transaction start and reset on commit/rollback
     159              :     int64_t    db_version;
     160              :     // version the DB would have if the transaction committed now
     161              :     int64_t    pending_db_version;
     162              :     // used to set an order inside each transaction
     163              :     int        seq;
     164              :     
     165              :     // optional schema_name to be set in the cloudsync_table_context
     166              :     char       *current_schema;
     167              :     
     168              :     // augmented tables are stored in-memory so we do not need to retrieve information about
     169              :     // col_names and cid from the disk each time a write statement is performed
     170              :     // we do also not need to use an hash map here because for few tables the direct
     171              :     // in-memory comparison with table name is faster
     172              :     cloudsync_table_context **tables;   // dense vector: [0..tables_count-1] are valid
     173              :     int tables_count;                   // size
     174              :     int tables_cap;                     // capacity
     175              : 
     176              :     int skip_decode_idx;                // -1 in sqlite, col_value index in postgresql
     177              : 
     178              :     // deferred column-batch merge (active during payload_apply)
     179              :     merge_pending_batch *pending_batch;
     180              : };
     181              : 
     182              : struct cloudsync_table_context {
     183              :     table_algo  algo;                           // CRDT algoritm associated to the table
     184              :     char        *name;                          // table name
     185              :     char        *schema;                        // table schema
     186              :     char        *meta_ref;                      // schema-qualified meta table name (e.g. "schema"."name_cloudsync")
     187              :     char        *base_ref;                      // schema-qualified base table name (e.g. "schema"."name")
     188              :     char        **col_name;                     // array of column names
     189              :     dbvm_t      **col_merge_stmt;               // array of merge insert stmt (indexed by col_name)
     190              :     dbvm_t      **col_value_stmt;               // array of column value stmt (indexed by col_name)
     191              :     int         *col_id;                        // array of column id
     192              :     col_algo_t  *col_algo;                      // per-column algorithm (normal or block)
     193              :     char        **col_delimiter;                // per-column delimiter for block splitting (NULL = default "\n")
     194              :     bool        has_block_cols;                 // quick check: does this table have any block columns?
     195              :     dbvm_t      *block_value_read_stmt;         // SELECT col_value FROM blocks table
     196              :     dbvm_t      *block_value_write_stmt;        // INSERT OR REPLACE into blocks table
     197              :     dbvm_t      *block_value_delete_stmt;       // DELETE from blocks table
     198              :     dbvm_t      *block_list_stmt;               // SELECT block entries for materialization
     199              :     char        *blocks_ref;                    // schema-qualified blocks table name
     200              :     int         ncols;                          // number of non primary key cols
     201              :     int         npks;                           // number of primary key cols
     202              :     bool        enabled;                        // flag to check if a table is enabled or disabled
     203              :     #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
     204              :     bool        rowid_only;                     // a table with no primary keys other than the implicit rowid
     205              :     #endif
     206              :     
     207              :     char        **pk_name;                      // array of primary key names
     208              : 
     209              :     // precompiled statements
     210              :     dbvm_t      *meta_pkexists_stmt;            // check if a primary key already exist in the augmented table
     211              :     dbvm_t      *meta_sentinel_update_stmt;     // update a local sentinel row
     212              :     dbvm_t      *meta_sentinel_insert_stmt;     // insert a local sentinel row
     213              :     dbvm_t      *meta_row_insert_update_stmt;   // insert/update a local row
     214              :     dbvm_t      *meta_row_drop_stmt;            // delete rows from meta
     215              :     dbvm_t      *meta_update_move_stmt;         // update rows in meta when pk changes
     216              :     dbvm_t      *meta_local_cl_stmt;            // compute local cl value
     217              :     dbvm_t      *meta_winner_clock_stmt;        // get the rowid of the last inserted/updated row in the meta table
     218              :     dbvm_t      *meta_merge_delete_drop;
     219              :     dbvm_t      *meta_zero_clock_stmt;
     220              :     dbvm_t      *meta_col_version_stmt;
     221              :     dbvm_t      *meta_site_id_stmt;
     222              :     
     223              :     dbvm_t      *real_col_values_stmt;          // retrieve all column values based on pk
     224              :     dbvm_t      *real_merge_delete_stmt;
     225              :     dbvm_t      *real_merge_sentinel_stmt;
     226              :     
     227              :     // context
     228              :     cloudsync_context *context;
     229              : };
     230              : 
     231              : struct cloudsync_payload_context {
     232              :     char        *buffer;
     233              :     size_t      bsize;
     234              :     size_t      balloc;
     235              :     size_t      bused;
     236              :     uint64_t    nrows;
     237              :     uint16_t    ncols;
     238              : };
     239              : 
     240              : #ifdef _MSC_VER
     241              :     #pragma pack(push, 1) // For MSVC: pack struct with 1-byte alignment
     242              :     #define PACKED
     243              : #else
     244              :     #define PACKED __attribute__((__packed__))
     245              : #endif
     246              : 
     247              : typedef struct PACKED {
     248              :     uint32_t    signature;          // 'CLSY'
     249              :     uint8_t     version;            // protocol version
     250              :     uint8_t     libversion[3];      // major.minor.patch
     251              :     uint32_t    expanded_size;
     252              :     uint16_t    ncols;
     253              :     uint32_t    nrows;
     254              :     uint64_t    schema_hash;
     255              :     uint8_t     checksum[6];        // 48 bits checksum (to ensure struct is 32 bytes)
     256              : } cloudsync_payload_header;
     257              : 
     258              : #ifdef _MSC_VER
     259              :     #pragma pack(pop)
     260              : #endif
     261              : 
     262              : #if CLOUDSYNC_UNITTEST
     263              : bool force_uncompressed_blob = false;
     264              : #define CHECK_FORCE_UNCOMPRESSED_BUFFER()   if (force_uncompressed_blob) use_uncompressed_buffer = true
     265              : #else
     266              : #define CHECK_FORCE_UNCOMPRESSED_BUFFER()
     267              : #endif
     268              : 
     269              : // Internal prototypes
     270              : int local_mark_insert_or_update_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const char *col_name, int64_t db_version, int seq);
     271              : 
     272              : // MARK: - CRDT algos -
     273              : 
     274          513 : table_algo cloudsync_algo_from_name (const char *algo_name) {
     275          513 :     if (algo_name == NULL) return table_algo_none;
     276              :     
     277          513 :     if ((strcasecmp(algo_name, "CausalLengthSet") == 0) || (strcasecmp(algo_name, "cls") == 0)) return table_algo_crdt_cls;
     278          224 :     if ((strcasecmp(algo_name, "GrowOnlySet") == 0) || (strcasecmp(algo_name, "gos") == 0)) return table_algo_crdt_gos;
     279          217 :     if ((strcasecmp(algo_name, "DeleteWinsSet") == 0) || (strcasecmp(algo_name, "dws") == 0)) return table_algo_crdt_dws;
     280          217 :     if ((strcasecmp(algo_name, "AddWinsSet") == 0) || (strcasecmp(algo_name, "aws") == 0)) return table_algo_crdt_aws;
     281              :     
     282              :     // if nothing is found
     283          217 :     return table_algo_none;
     284          513 : }
     285              : 
     286          110 : const char *cloudsync_algo_name (table_algo algo) {
     287          110 :     switch (algo) {
     288          105 :         case table_algo_crdt_cls: return "cls";
     289            1 :         case table_algo_crdt_gos: return "gos";
     290            1 :         case table_algo_crdt_dws: return "dws";
     291            1 :         case table_algo_crdt_aws: return "aws";
     292            1 :         case table_algo_none: return NULL;
     293              :     }
     294            1 :     return NULL;
     295          110 : }
     296              : 
     297              : // MARK: - DBVM Utils -
     298              : 
     299        31455 : DBVM_VALUE dbvm_execute (dbvm_t *stmt, cloudsync_context *data) {
     300        31455 :     if (!stmt) return DBVM_VALUE_ERROR;
     301              : 
     302        31455 :     int rc = databasevm_step(stmt);
     303        31455 :     if (rc != DBRES_ROW && rc != DBRES_DONE) {
     304            2 :         if (data) DEBUG_DBERROR(rc, "stmt_execute", data);
     305            2 :         databasevm_reset(stmt);
     306            2 :         return DBVM_VALUE_ERROR;
     307              :     }
     308              : 
     309        31453 :     DBVM_VALUE result = DBVM_VALUE_CHANGED;
     310        31453 :     if (stmt == data->data_version_stmt) {
     311        29319 :         int version = (int)database_column_int(stmt, 0);
     312        29319 :         if (version != data->data_version) {
     313          185 :             data->data_version = version;
     314          185 :         } else {
     315        29134 :             result = DBVM_VALUE_UNCHANGED;
     316              :         }
     317        31453 :     } else if (stmt == data->schema_version_stmt) {
     318         1067 :         int version = (int)database_column_int(stmt, 0);
     319         1067 :         if (version > data->schema_version) {
     320          313 :             data->schema_version = version;
     321          313 :         } else {
     322          754 :             result = DBVM_VALUE_UNCHANGED;
     323              :         }
     324              :         
     325         2134 :     } else if (stmt == data->db_version_stmt) {
     326         1067 :         data->db_version = (rc == DBRES_DONE) ? CLOUDSYNC_MIN_DB_VERSION : database_column_int(stmt, 0);
     327         1067 :     }
     328              :     
     329        31453 :     databasevm_reset(stmt);
     330        31453 :     return result;
     331        31455 : }
     332              : 
     333         3514 : int dbvm_count (dbvm_t *stmt, const char *value, size_t len, int type) {
     334         3514 :     int result = -1;
     335         3514 :     int rc = DBRES_OK;
     336              :     
     337         3514 :     if (value) {
     338         3513 :         rc = (type == DBTYPE_TEXT) ? databasevm_bind_text(stmt, 1, value, (int)len) : databasevm_bind_blob(stmt, 1, value, len);
     339         3513 :         if (rc != DBRES_OK) goto cleanup;
     340         3513 :     }
     341              : 
     342         3514 :     rc = databasevm_step(stmt);
     343         7028 :     if (rc == DBRES_DONE) {
     344            1 :         result = 0;
     345            1 :         rc = DBRES_OK;
     346         3514 :     } else if (rc == DBRES_ROW) {
     347         3513 :         result = (int)database_column_int(stmt, 0);
     348         3513 :         rc = DBRES_OK;
     349         3513 :     }
     350              :     
     351              : cleanup:
     352         3514 :     databasevm_reset(stmt);
     353         3514 :     return result;
     354              : }
     355              : 
     356       254869 : void dbvm_reset (dbvm_t *stmt) {
     357       254869 :     if (!stmt) return;
     358       231934 :     databasevm_clear_bindings(stmt);
     359       231934 :     databasevm_reset(stmt);
     360       254869 : }
     361              : 
     362              : // MARK: - Database Version -
     363              : 
     364          557 : char *cloudsync_dbversion_build_query (cloudsync_context *data) {
     365              :     // this function must be manually called each time tables changes
     366              :     // because the query plan changes too and it must be re-prepared
     367              :     // unfortunately there is no other way
     368              :     
     369              :     // we need to execute a query like:
     370              :     /*
     371              :      SELECT max(version) as version FROM (
     372              :          SELECT max(db_version) as version FROM "table1_cloudsync"
     373              :          UNION ALL
     374              :          SELECT max(db_version) as version FROM "table2_cloudsync"
     375              :          UNION ALL
     376              :          SELECT max(db_version) as version FROM "table3_cloudsync"
     377              :          UNION
     378              :          SELECT value as version FROM cloudsync_settings WHERE key = 'pre_alter_dbversion'
     379              :      )
     380              :      */
     381              :     
     382              :     // the good news is that the query can be computed in SQLite without the need to do any extra computation from the host language
     383              :     
     384          557 :     char *value = NULL;
     385          557 :     int rc = database_select_text(data, SQL_DBVERSION_BUILD_QUERY, &value);
     386          557 :     return (rc == DBRES_OK) ? value : NULL;
     387              : }
     388              : 
     389          743 : int cloudsync_dbversion_rebuild (cloudsync_context *data) {
     390          743 :     if (data->db_version_stmt) {
     391          371 :         databasevm_finalize(data->db_version_stmt);
     392          371 :         data->db_version_stmt = NULL;
     393          371 :     }
     394              :     
     395          743 :     int64_t count = dbutils_table_settings_count_tables(data);
     396          743 :     if (count == 0) return DBRES_OK;
     397          557 :     else if (count == -1) return cloudsync_set_dberror(data);
     398              :     
     399          557 :     char *sql = cloudsync_dbversion_build_query(data);
     400          557 :     if (!sql) return DBRES_NOMEM;
     401              :     DEBUG_SQL("db_version_stmt: %s", sql);
     402              :     
     403          557 :     int rc = databasevm_prepare(data, sql, (void **)&data->db_version_stmt, DBFLAG_PERSISTENT);
     404              :     DEBUG_STMT("db_version_stmt %p", data->db_version_stmt);
     405          557 :     cloudsync_memory_free(sql);
     406          557 :     return rc;
     407          743 : }
     408              : 
     409         1067 : int cloudsync_dbversion_rerun (cloudsync_context *data) {
     410         1067 :     DBVM_VALUE schema_changed = dbvm_execute(data->schema_version_stmt, data);
     411         1067 :     if (schema_changed == DBVM_VALUE_ERROR) return -1;
     412              : 
     413         1067 :     if (schema_changed == DBVM_VALUE_CHANGED) {
     414          313 :         int rc = cloudsync_dbversion_rebuild(data);
     415          313 :         if (rc != DBRES_OK) return -1;
     416          313 :     }
     417              : 
     418         1067 :     if (!data->db_version_stmt) {
     419            0 :         data->db_version = CLOUDSYNC_MIN_DB_VERSION;
     420            0 :         return 0;
     421              :     }
     422              : 
     423         1067 :     DBVM_VALUE rc = dbvm_execute(data->db_version_stmt, data);
     424         1067 :     if (rc == DBVM_VALUE_ERROR) return -1;
     425         1067 :     return 0;
     426         1067 : }
     427              : 
     428        29319 : int cloudsync_dbversion_check_uptodate (cloudsync_context *data) {
     429              :     // perform a PRAGMA data_version to check if some other process write any data
     430        29319 :     DBVM_VALUE rc = dbvm_execute(data->data_version_stmt, data);
     431        29319 :     if (rc == DBVM_VALUE_ERROR) return -1;
     432              :     
     433              :     // db_version is already set and there is no need to update it
     434        29319 :     if (data->db_version != CLOUDSYNC_VALUE_NOTSET && rc == DBVM_VALUE_UNCHANGED) return 0;
     435              :     
     436         1067 :     return cloudsync_dbversion_rerun(data);
     437        29319 : }
     438              : 
     439        29295 : int64_t cloudsync_dbversion_next (cloudsync_context *data, int64_t merging_version) {
     440        29295 :     int rc = cloudsync_dbversion_check_uptodate(data);
     441        29295 :     if (rc != DBRES_OK) return -1;
     442              :     
     443        29295 :     int64_t result = data->db_version + 1;
     444        29295 :     if (result < data->pending_db_version) result = data->pending_db_version;
     445        29295 :     if (merging_version != CLOUDSYNC_VALUE_NOTSET && result < merging_version) result = merging_version;
     446        29295 :     data->pending_db_version = result;
     447              :     
     448        29295 :     return result;
     449        29295 : }
     450              : 
     451              : // MARK: - PK Context -
     452              : 
     453            0 : char *cloudsync_pk_context_tbl (cloudsync_pk_decode_bind_context *ctx, int64_t *tbl_len) {
     454            0 :     *tbl_len = ctx->tbl_len;
     455            0 :     return ctx->tbl;
     456              : }
     457              : 
     458            0 : void *cloudsync_pk_context_pk (cloudsync_pk_decode_bind_context *ctx, int64_t *pk_len) {
     459            0 :     *pk_len = ctx->pk_len;
     460            0 :     return (void *)ctx->pk;
     461              : }
     462              : 
     463            0 : char *cloudsync_pk_context_colname (cloudsync_pk_decode_bind_context *ctx, int64_t *colname_len) {
     464            0 :     *colname_len = ctx->col_name_len;
     465            0 :     return ctx->col_name;
     466              : }
     467              : 
     468            0 : int64_t cloudsync_pk_context_cl (cloudsync_pk_decode_bind_context *ctx) {
     469            0 :     return ctx->cl;
     470              : }
     471              : 
     472            0 : int64_t cloudsync_pk_context_dbversion (cloudsync_pk_decode_bind_context *ctx) {
     473            0 :     return ctx->db_version;
     474              : }
     475              : 
     476              : // MARK: - CloudSync Context -
     477              : 
     478        12547 : int cloudsync_insync (cloudsync_context *data) {
     479        12547 :     return data->insync;
     480              : }
     481              : 
     482          731 : void *cloudsync_siteid (cloudsync_context *data) {
     483          731 :     return (void *)data->site_id;
     484              : }
     485              : 
     486            1 : void cloudsync_reset_siteid (cloudsync_context *data) {
     487            1 :     memset(data->site_id, 0, sizeof(uint8_t) * UUID_LEN);
     488            1 : }
     489              : 
     490          187 : int cloudsync_load_siteid (cloudsync_context *data) {
     491              :     // check if site_id was already loaded
     492          187 :     if (data->site_id[0] != 0) return DBRES_OK;
     493              :     
     494              :     // load site_id
     495          186 :     char *buffer = NULL;
     496          186 :     int64_t size = 0;
     497          186 :     int rc = database_select_blob(data, SQL_SITEID_SELECT_ROWID0, &buffer, &size);
     498          186 :     if (rc != DBRES_OK) return rc;
     499          186 :     if (!buffer || size != UUID_LEN) {
     500            0 :         if (buffer) cloudsync_memory_free(buffer);
     501            0 :         return cloudsync_set_error(data, "Unable to retrieve siteid", DBRES_MISUSE);
     502              :     }
     503              :     
     504          186 :     memcpy(data->site_id, buffer, UUID_LEN);
     505          186 :     cloudsync_memory_free(buffer);
     506              :     
     507          186 :     return DBRES_OK;
     508          187 : }
     509              : 
     510            1 : int64_t cloudsync_dbversion (cloudsync_context *data) {
     511            1 :     return data->db_version;
     512              : }
     513              : 
     514        11937 : int cloudsync_bumpseq (cloudsync_context *data) {
     515        11937 :     int value = data->seq;
     516        11937 :     data->seq += 1;
     517        11937 :     return value;
     518              : }
     519              : 
     520          245 : void cloudsync_update_schema_hash (cloudsync_context *data) {
     521          245 :     database_update_schema_hash(data, &data->schema_hash);
     522          245 : }
     523              : 
     524        57595 : void *cloudsync_db (cloudsync_context *data) {
     525        57595 :     return data->db;
     526              : }
     527              : 
     528          430 : int cloudsync_add_dbvms (cloudsync_context *data) {
     529              :     DEBUG_DBFUNCTION("cloudsync_add_stmts");
     530              :     
     531          430 :     if (data->data_version_stmt == NULL) {
     532          186 :         int rc = databasevm_prepare(data, SQL_DATA_VERSION, (void **)&data->data_version_stmt, DBFLAG_PERSISTENT);
     533              :         DEBUG_STMT("data_version_stmt %p", data->data_version_stmt);
     534          186 :         if (rc != DBRES_OK) return rc;
     535              :         DEBUG_SQL("data_version_stmt: %s", SQL_DATA_VERSION);
     536          186 :     }
     537              :     
     538          430 :     if (data->schema_version_stmt == NULL) {
     539          186 :         int rc = databasevm_prepare(data, SQL_SCHEMA_VERSION, (void **)&data->schema_version_stmt, DBFLAG_PERSISTENT);
     540              :         DEBUG_STMT("schema_version_stmt %p", data->schema_version_stmt);
     541          186 :         if (rc != DBRES_OK) return rc;
     542              :         DEBUG_SQL("schema_version_stmt: %s", SQL_SCHEMA_VERSION);
     543          186 :     }
     544              :     
     545          430 :     if (data->getset_siteid_stmt == NULL) {
     546              :         // get and set index of the site_id
     547              :         // in SQLite, we can’t directly combine an INSERT and a SELECT to both insert a row and return an identifier (rowid) in a single statement,
     548              :         // however, we can use a workaround by leveraging the INSERT statement with ON CONFLICT DO UPDATE and then combining it with RETURNING rowid
     549          186 :         int rc = databasevm_prepare(data, SQL_SITEID_GETSET_ROWID_BY_SITEID, (void **)&data->getset_siteid_stmt, DBFLAG_PERSISTENT);
     550              :         DEBUG_STMT("getset_siteid_stmt %p", data->getset_siteid_stmt);
     551          186 :         if (rc != DBRES_OK) return rc;
     552              :         DEBUG_SQL("getset_siteid_stmt: %s", SQL_SITEID_GETSET_ROWID_BY_SITEID);
     553          186 :     }
     554              :     
     555          430 :     return cloudsync_dbversion_rebuild(data);
     556          430 : }
     557              : 
     558           17 : int cloudsync_set_error (cloudsync_context *data, const char *err_user, int err_code) {
     559              :     // force err_code to be something different than OK
     560           17 :     if (err_code == DBRES_OK) err_code = database_errcode(data);
     561           17 :     if (err_code == DBRES_OK) err_code = DBRES_ERROR;
     562              :     
     563              :     // compute a meaningful error message
     564           17 :     if (err_user == NULL) {
     565            4 :         snprintf(data->errmsg, sizeof(data->errmsg), "%s", database_errmsg(data));
     566            4 :     } else {
     567           13 :         const char *db_error = database_errmsg(data);
     568              :         char db_error_copy[sizeof(data->errmsg)];
     569           13 :         int rc = database_errcode(data);
     570           13 :         if (rc == DBRES_OK) {
     571           13 :             snprintf(data->errmsg, sizeof(data->errmsg), "%s", err_user);
     572           13 :         } else {
     573            0 :             if (db_error == data->errmsg) {
     574            0 :                 snprintf(db_error_copy, sizeof(db_error_copy), "%s", db_error);
     575            0 :                 db_error = db_error_copy;
     576            0 :             }
     577            0 :             snprintf(data->errmsg, sizeof(data->errmsg), "%s (%s)", err_user, db_error);
     578              :         }
     579              :     }
     580              :     
     581           17 :     data->errcode = err_code;
     582           17 :     return err_code;
     583              : }
     584              : 
     585            4 : int cloudsync_set_dberror (cloudsync_context *data) {
     586            4 :     return cloudsync_set_error(data, NULL, DBRES_OK);
     587              : }
     588              : 
     589            8 : const char *cloudsync_errmsg (cloudsync_context *data) {
     590            8 :     return data->errmsg;
     591              : }
     592              : 
     593            3 : int cloudsync_errcode (cloudsync_context *data) {
     594            3 :     return data->errcode;
     595              : }
     596              : 
     597            2 : void cloudsync_reset_error (cloudsync_context *data) {
     598            2 :     data->errmsg[0] = 0;
     599            2 :     data->errcode = DBRES_OK;
     600            2 : }
     601              : 
     602            3 : void *cloudsync_auxdata (cloudsync_context *data) {
     603            3 :     return data->aux_data;
     604              : }
     605              : 
     606            2 : void cloudsync_set_auxdata (cloudsync_context *data, void *xdata) {
     607            2 :     data->aux_data = xdata;
     608            2 : }
     609              : 
     610            6 : void cloudsync_set_schema (cloudsync_context *data, const char *schema) {
     611            6 :     if (data->current_schema && schema && strcmp(data->current_schema, schema) == 0) return;
     612            5 :     if (data->current_schema) cloudsync_memory_free(data->current_schema);
     613            5 :     data->current_schema = NULL;
     614            5 :     if (schema) data->current_schema = cloudsync_string_dup_lowercase(schema);
     615            6 : }
     616              : 
     617         1079 : const char *cloudsync_schema (cloudsync_context *data) {
     618         1079 :     return data->current_schema;
     619              : }
     620              : 
     621            4 : const char *cloudsync_table_schema (cloudsync_context *data, const char *table_name) {
     622            4 :     cloudsync_table_context *table = table_lookup(data, table_name);
     623            4 :     if (!table) return NULL;
     624              : 
     625            1 :     return table->schema;
     626            4 : }
     627              : 
     628              : // MARK: - Table Utils -
     629              : 
     630           69 : void table_pknames_free (char **names, int nrows) {
     631           69 :     if (!names) return;
     632          132 :     for (int i = 0; i < nrows; ++i) {cloudsync_memory_free(names[i]);}
     633           46 :     cloudsync_memory_free(names);
     634           69 : }
     635              : 
     636          240 : char *table_build_mergedelete_sql (cloudsync_table_context *table) {
     637              :     #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
     638              :     if (table->rowid_only) {
     639              :         char *sql = memory_mprintf(SQL_DELETE_ROW_BY_ROWID, table->name);
     640              :         return sql;
     641              :     }
     642              :     #endif
     643              : 
     644          240 :     return sql_build_delete_by_pk(table->context, table->name, table->schema);
     645              : }
     646              : 
     647         1258 : char *table_build_mergeinsert_sql (cloudsync_table_context *table, const char *colname) {
     648         1258 :     char *sql = NULL;
     649              :     
     650              :     #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
     651              :     if (table->rowid_only) {
     652              :         if (colname == NULL) {
     653              :             // INSERT OR IGNORE INTO customers (first_name,last_name) VALUES (?,?);
     654              :             sql = memory_mprintf(SQL_INSERT_ROWID_IGNORE, table->name);
     655              :         } else {
     656              :             // INSERT INTO customers (first_name,last_name,age) VALUES (?,?,?) ON CONFLICT DO UPDATE SET age=?;
     657              :             sql = memory_mprintf(SQL_UPSERT_ROWID_AND_COL_BY_ROWID, table->name, colname, colname);
     658              :         }
     659              :         return sql;
     660              :     }
     661              :     #endif
     662              :     
     663         1258 :     if (colname == NULL) {
     664              :         // is sentinel insert
     665          240 :         sql = sql_build_insert_pk_ignore(table->context, table->name, table->schema);
     666          240 :     } else {
     667         1018 :         sql = sql_build_upsert_pk_and_col(table->context, table->name, colname, table->schema);
     668              :     }
     669         1258 :     return sql;
     670              : }
     671              : 
     672         1018 : char *table_build_value_sql (cloudsync_table_context *table, const char *colname) {
     673              :     #if !CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
     674              :     if (table->rowid_only) {
     675              :         char *colnamequote = "\"";
     676              :         char *sql = memory_mprintf(SQL_SELECT_COLS_BY_ROWID_FMT, colnamequote, colname, colnamequote, table->name);
     677              :         return sql;
     678              :     }
     679              :     #endif
     680              :         
     681              :     // SELECT age FROM customers WHERE first_name=? AND last_name=?;
     682         1018 :     return sql_build_select_cols_by_pk(table->context, table->name, colname, table->schema);
     683              : }
     684              :     
     685          240 : cloudsync_table_context *table_create (cloudsync_context *data, const char *name, table_algo algo) {
     686              :     DEBUG_DBFUNCTION("table_create %s", name);
     687              :     
     688          240 :     cloudsync_table_context *table = (cloudsync_table_context *)cloudsync_memory_zeroalloc(sizeof(cloudsync_table_context));
     689          240 :     if (!table) return NULL;
     690              :     
     691          240 :     table->context = data;
     692          240 :     table->algo = algo;
     693          240 :     table->name = cloudsync_string_dup_lowercase(name);
     694              : 
     695              :     // Detect schema from metadata table location. If metadata table doesn't
     696              :     // exist yet (during initialization), fall back to cloudsync_schema() which
     697              :     // returns the explicitly set schema or current_schema().
     698          240 :     table->schema = database_table_schema(name);
     699          240 :     if (!table->schema) {
     700          240 :         const char *fallback_schema = cloudsync_schema(data);
     701          240 :         if (fallback_schema) {
     702            0 :             table->schema = cloudsync_string_dup(fallback_schema);
     703            0 :         }
     704          240 :     }
     705              : 
     706          240 :     if (!table->name) {
     707            0 :         cloudsync_memory_free(table);
     708            0 :         return NULL;
     709              :     }
     710          240 :     table->meta_ref = database_build_meta_ref(table->schema, table->name);
     711          240 :     table->base_ref = database_build_base_ref(table->schema, table->name);
     712          240 :     table->enabled = true;
     713              :         
     714          240 :     return table;
     715          240 : }
     716              : 
     717          240 : void table_free (cloudsync_table_context *table) {
     718              :     DEBUG_DBFUNCTION("table_free %s", (table) ? (table->name) : "NULL");
     719          240 :     if (!table) return;
     720              :     
     721          240 :     if (table->ncols > 0) {
     722          201 :         if (table->col_name) {
     723         1219 :             for (int i=0; i<table->ncols; ++i) {
     724         1018 :                 cloudsync_memory_free(table->col_name[i]);
     725         1018 :             }
     726          201 :             cloudsync_memory_free(table->col_name);
     727          201 :         }
     728          201 :         if (table->col_merge_stmt) {
     729         1219 :             for (int i=0; i<table->ncols; ++i) {
     730         1018 :                 databasevm_finalize(table->col_merge_stmt[i]);
     731         1018 :             }
     732          201 :             cloudsync_memory_free(table->col_merge_stmt);
     733          201 :         }
     734          201 :         if (table->col_value_stmt) {
     735         1219 :             for (int i=0; i<table->ncols; ++i) {
     736         1018 :                 databasevm_finalize(table->col_value_stmt[i]);
     737         1018 :             }
     738          201 :             cloudsync_memory_free(table->col_value_stmt);
     739          201 :         }
     740          201 :         if (table->col_id) {
     741          201 :             cloudsync_memory_free(table->col_id);
     742          201 :         }
     743          201 :         if (table->col_algo) {
     744          201 :             cloudsync_memory_free(table->col_algo);
     745          201 :         }
     746          201 :         if (table->col_delimiter) {
     747         1219 :             for (int i=0; i<table->ncols; ++i) {
     748         1018 :                 if (table->col_delimiter[i]) cloudsync_memory_free(table->col_delimiter[i]);
     749         1018 :             }
     750          201 :             cloudsync_memory_free(table->col_delimiter);
     751          201 :         }
     752          201 :     }
     753              : 
     754          240 :     if (table->block_value_read_stmt) databasevm_finalize(table->block_value_read_stmt);
     755          240 :     if (table->block_value_write_stmt) databasevm_finalize(table->block_value_write_stmt);
     756          240 :     if (table->block_value_delete_stmt) databasevm_finalize(table->block_value_delete_stmt);
     757          240 :     if (table->block_list_stmt) databasevm_finalize(table->block_list_stmt);
     758          240 :     if (table->blocks_ref) cloudsync_memory_free(table->blocks_ref);
     759              : 
     760          240 :     if (table->name) cloudsync_memory_free(table->name);
     761          240 :     if (table->schema) cloudsync_memory_free(table->schema);
     762          240 :     if (table->meta_ref) cloudsync_memory_free(table->meta_ref);
     763          240 :     if (table->base_ref) cloudsync_memory_free(table->base_ref);
     764          240 :     if (table->pk_name) table_pknames_free(table->pk_name, table->npks);
     765          240 :     if (table->meta_pkexists_stmt) databasevm_finalize(table->meta_pkexists_stmt);
     766          240 :     if (table->meta_sentinel_update_stmt) databasevm_finalize(table->meta_sentinel_update_stmt);
     767          240 :     if (table->meta_sentinel_insert_stmt) databasevm_finalize(table->meta_sentinel_insert_stmt);
     768          240 :     if (table->meta_row_insert_update_stmt) databasevm_finalize(table->meta_row_insert_update_stmt);
     769          240 :     if (table->meta_row_drop_stmt) databasevm_finalize(table->meta_row_drop_stmt);
     770          240 :     if (table->meta_update_move_stmt) databasevm_finalize(table->meta_update_move_stmt);
     771          240 :     if (table->meta_local_cl_stmt) databasevm_finalize(table->meta_local_cl_stmt);
     772          240 :     if (table->meta_winner_clock_stmt) databasevm_finalize(table->meta_winner_clock_stmt);
     773          240 :     if (table->meta_merge_delete_drop) databasevm_finalize(table->meta_merge_delete_drop);
     774          240 :     if (table->meta_zero_clock_stmt) databasevm_finalize(table->meta_zero_clock_stmt);
     775          240 :     if (table->meta_col_version_stmt) databasevm_finalize(table->meta_col_version_stmt);
     776          240 :     if (table->meta_site_id_stmt) databasevm_finalize(table->meta_site_id_stmt);
     777              :     
     778          240 :     if (table->real_col_values_stmt) databasevm_finalize(table->real_col_values_stmt);
     779          240 :     if (table->real_merge_delete_stmt) databasevm_finalize(table->real_merge_delete_stmt);
     780          240 :     if (table->real_merge_sentinel_stmt) databasevm_finalize(table->real_merge_sentinel_stmt);
     781              :     
     782          240 :     cloudsync_memory_free(table);
     783          240 : }
     784              : 
     785          240 : int table_add_stmts (cloudsync_table_context *table, int ncols) {
     786          240 :     int rc = DBRES_OK;
     787          240 :     char *sql = NULL;
     788          240 :     cloudsync_context *data = table->context;
     789              :     
     790              :     // META TABLE statements
     791              :     
     792              :     // CREATE TABLE IF NOT EXISTS \"%w_cloudsync\" (pk BLOB NOT NULL, col_name TEXT NOT NULL, col_version INTEGER, db_version INTEGER, site_id INTEGER DEFAULT 0, seq INTEGER, PRIMARY KEY (pk, col_name));
     793              :     
     794              :     // precompile the pk exists statement
     795              :     // we do not need an index on the pk column because it is already covered by the fact that it is part of the prikeys
     796              :     // EXPLAIN QUERY PLAN reports: SEARCH table_name USING PRIMARY KEY (pk=?)
     797          240 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_ROW_EXISTS_BY_PK, table->meta_ref);
     798          240 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     799              :     DEBUG_SQL("meta_pkexists_stmt: %s", sql);
     800              : 
     801          240 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_pkexists_stmt, DBFLAG_PERSISTENT);
     802          240 :     cloudsync_memory_free(sql);
     803          240 :     if (rc != DBRES_OK) goto cleanup;
     804              : 
     805              :     // precompile the update local sentinel statement
     806          240 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPDATE_COL_BUMP_VERSION, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
     807          240 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     808              :     DEBUG_SQL("meta_sentinel_update_stmt: %s", sql);
     809              :     
     810          240 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_sentinel_update_stmt, DBFLAG_PERSISTENT);
     811          240 :     cloudsync_memory_free(sql);
     812          240 :     if (rc != DBRES_OK) goto cleanup;
     813              :     
     814              :     // precompile the insert local sentinel statement
     815          240 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPSERT_COL_INIT_OR_BUMP_VERSION, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE, table->meta_ref, table->meta_ref, table->meta_ref);
     816          240 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     817              :     DEBUG_SQL("meta_sentinel_insert_stmt: %s", sql);
     818              :     
     819          240 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_sentinel_insert_stmt, DBFLAG_PERSISTENT);
     820          240 :     cloudsync_memory_free(sql);
     821          240 :     if (rc != DBRES_OK) goto cleanup;
     822              : 
     823              :     // precompile the insert/update local row statement
     824          240 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPSERT_RAW_COLVERSION, table->meta_ref, table->meta_ref);
     825          240 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     826              :     DEBUG_SQL("meta_row_insert_update_stmt: %s", sql);
     827              :     
     828          240 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_row_insert_update_stmt, DBFLAG_PERSISTENT);
     829          240 :     cloudsync_memory_free(sql);
     830          240 :     if (rc != DBRES_OK) goto cleanup;
     831              :     
     832              :     // precompile the delete rows from meta
     833          240 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_DELETE_PK_EXCEPT_COL, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
     834          240 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     835              :     DEBUG_SQL("meta_row_drop_stmt: %s", sql);
     836              : 
     837          240 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_row_drop_stmt, DBFLAG_PERSISTENT);
     838          240 :     cloudsync_memory_free(sql);
     839          240 :     if (rc != DBRES_OK) goto cleanup;
     840              :     
     841              :     // precompile the update rows from meta when pk changes
     842              :     // see https://github.com/sqliteai/sqlite-sync/blob/main/docs/PriKey.md for more details
     843          240 :     sql = sql_build_rekey_pk_and_reset_version_except_col(data, table->name, CLOUDSYNC_TOMBSTONE_VALUE);
     844          240 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     845              :     DEBUG_SQL("meta_update_move_stmt: %s", sql);
     846              :     
     847          240 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_update_move_stmt, DBFLAG_PERSISTENT);
     848          240 :     cloudsync_memory_free(sql);
     849          240 :     if (rc != DBRES_OK) goto cleanup;
     850              :     
     851              :     // local cl
     852          240 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_GET_COL_VERSION_OR_ROW_EXISTS, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE, table->meta_ref);
     853          240 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     854              :     DEBUG_SQL("meta_local_cl_stmt: %s", sql);
     855              :     
     856          240 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_local_cl_stmt, DBFLAG_PERSISTENT);
     857          240 :     cloudsync_memory_free(sql);
     858          240 :     if (rc != DBRES_OK) goto cleanup;
     859              :     
     860              :     // rowid of the last inserted/updated row in the meta table
     861          240 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_INSERT_RETURN_CHANGE_ID, table->meta_ref);
     862          240 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     863              :     DEBUG_SQL("meta_winner_clock_stmt: %s", sql);
     864              :     
     865          240 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_winner_clock_stmt, DBFLAG_PERSISTENT);
     866          240 :     cloudsync_memory_free(sql);
     867          240 :     if (rc != DBRES_OK) goto cleanup;
     868              :     
     869          240 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_DELETE_PK_EXCEPT_COL, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
     870          240 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     871              :     DEBUG_SQL("meta_merge_delete_drop: %s", sql);
     872              : 
     873          240 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_merge_delete_drop, DBFLAG_PERSISTENT);
     874          240 :     cloudsync_memory_free(sql);
     875          240 :     if (rc != DBRES_OK) goto cleanup;
     876              :     
     877              :     // zero clock
     878          240 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_TOMBSTONE_PK_EXCEPT_COL, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
     879          240 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     880              :     DEBUG_SQL("meta_zero_clock_stmt: %s", sql);
     881              :     
     882          240 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_zero_clock_stmt, DBFLAG_PERSISTENT);
     883          240 :     cloudsync_memory_free(sql);
     884          240 :     if (rc != DBRES_OK) goto cleanup;
     885              :     
     886              :     // col_version
     887          240 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_COL_VERSION_BY_PK_COL, table->meta_ref);
     888          240 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     889              :     DEBUG_SQL("meta_col_version_stmt: %s", sql);
     890              :     
     891          240 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_col_version_stmt, DBFLAG_PERSISTENT);
     892          240 :     cloudsync_memory_free(sql);
     893          240 :     if (rc != DBRES_OK) goto cleanup;
     894              :     
     895              :     // site_id
     896          240 :     sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_SITE_ID_BY_PK_COL, table->meta_ref);
     897          240 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     898              :     DEBUG_SQL("meta_site_id_stmt: %s", sql);
     899              :     
     900          240 :     rc = databasevm_prepare(data, sql, (void **)&table->meta_site_id_stmt, DBFLAG_PERSISTENT);
     901          240 :     cloudsync_memory_free(sql);
     902          240 :     if (rc != DBRES_OK) goto cleanup;
     903              :     
     904              :     // REAL TABLE statements
     905              : 
     906              :     // precompile the get column value statement
     907          240 :     if (ncols > 0) {
     908          201 :         sql = sql_build_select_nonpk_by_pk(data, table->name, table->schema);
     909          201 :         if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     910              :         DEBUG_SQL("real_col_values_stmt: %s", sql);
     911              :         
     912          201 :         rc = databasevm_prepare(data, sql, (void **)&table->real_col_values_stmt, DBFLAG_PERSISTENT);
     913          201 :         cloudsync_memory_free(sql);
     914          201 :         if (rc != DBRES_OK) goto cleanup;
     915          201 :     }
     916              :     
     917          240 :     sql = table_build_mergedelete_sql(table);
     918          240 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     919              :     DEBUG_SQL("real_merge_delete: %s", sql);
     920              :     
     921          240 :     rc = databasevm_prepare(data, sql, (void **)&table->real_merge_delete_stmt, DBFLAG_PERSISTENT);
     922          240 :     cloudsync_memory_free(sql);
     923          240 :     if (rc != DBRES_OK) goto cleanup;
     924              :     
     925          240 :     sql = table_build_mergeinsert_sql(table, NULL);
     926          240 :     if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
     927              :     DEBUG_SQL("real_merge_sentinel: %s", sql);
     928              :     
     929          240 :     rc = databasevm_prepare(data, sql, (void **)&table->real_merge_sentinel_stmt, DBFLAG_PERSISTENT);
     930          240 :     cloudsync_memory_free(sql);
     931          240 :     if (rc != DBRES_OK) goto cleanup;
     932              :     
     933              : cleanup:
     934          240 :     if (rc != DBRES_OK) DEBUG_ALWAYS("table_add_stmts error: %d %s\n", rc, database_errmsg(data));
     935          240 :     return rc;
     936              : }
     937              : 
     938       183416 : cloudsync_table_context *table_lookup (cloudsync_context *data, const char *table_name) {
     939              :     DEBUG_DBFUNCTION("table_lookup %s", table_name);
     940              :     
     941       183416 :     if (table_name) {
     942       184448 :         for (int i=0; i<data->tables_count; ++i) {
     943       183955 :             if ((strcasecmp(data->tables[i]->name, table_name) == 0)) return data->tables[i];
     944         1032 :         }
     945          493 :     }
     946              :     
     947          493 :     return NULL;
     948       183416 : }
     949              : 
     950       168954 : void *table_column_lookup (cloudsync_table_context *table, const char *col_name, bool is_merge, int *index) {
     951              :     DEBUG_DBFUNCTION("table_column_lookup %s", col_name);
     952              :     
     953       314989 :     for (int i=0; i<table->ncols; ++i) {
     954       314989 :         if (strcasecmp(table->col_name[i], col_name) == 0) {
     955       168954 :             if (index) *index = i;
     956       168954 :             return (is_merge) ? table->col_merge_stmt[i] : table->col_value_stmt[i];
     957              :         }
     958       146035 :     }
     959              :     
     960            0 :     if (index) *index = -1;
     961            0 :     return NULL;
     962       168954 : }
     963              : 
     964          240 : int table_remove (cloudsync_context *data, cloudsync_table_context *table) {
     965          240 :     const char *table_name = table->name;
     966              :     DEBUG_DBFUNCTION("table_remove %s", table_name);
     967              :     
     968          282 :     for (int i = 0; i < data->tables_count; ++i) {
     969          282 :         cloudsync_table_context *t = data->tables[i];
     970              :         
     971              :         // pointer compare is fastest but fallback to strcasecmp if not same pointer
     972          282 :         if ((t == table) || ((strcasecmp(t->name, table_name) == 0))) {
     973          240 :             int last = data->tables_count - 1;
     974          240 :             data->tables[i] = data->tables[last];   // move last into the hole (keeps array dense)
     975          240 :             data->tables[last] = NULL;              // NULLify tail (as an extra security measure)
     976          240 :             data->tables_count--;
     977          240 :             return data->tables_count;
     978              :         }
     979           42 :     }
     980              :     
     981            0 :     return -1;
     982          240 : }
     983              : 
     984         1018 : int table_add_to_context_cb (void *xdata, int ncols, char **values, char **names) {
     985         1018 :     cloudsync_table_context *table = (cloudsync_table_context *)xdata;
     986         1018 :     cloudsync_context *data = table->context;
     987              : 
     988         1018 :     int index = table->ncols;
     989         2036 :     for (int i=0; i<ncols; i+=2) {
     990         1018 :         const char *name = values[i];
     991         1018 :         int cid = (int)strtol(values[i+1], NULL, 0);
     992              : 
     993         1018 :         table->col_id[index] = cid;
     994         1018 :         table->col_name[index] = cloudsync_string_dup_lowercase(name);
     995         1018 :         if (!table->col_name[index]) goto error;
     996              : 
     997         1018 :         char *sql = table_build_mergeinsert_sql(table, name);
     998         1018 :         if (!sql) goto error;
     999              :         DEBUG_SQL("col_merge_stmt[%d]: %s", index, sql);
    1000              : 
    1001         1018 :         int rc = databasevm_prepare(data, sql, (void **)&table->col_merge_stmt[index], DBFLAG_PERSISTENT);
    1002         1018 :         cloudsync_memory_free(sql);
    1003         1018 :         if (rc != DBRES_OK) goto error;
    1004         1018 :         if (!table->col_merge_stmt[index]) goto error;
    1005              : 
    1006         1018 :         sql = table_build_value_sql(table, name);
    1007         1018 :         if (!sql) goto error;
    1008              :         DEBUG_SQL("col_value_stmt[%d]: %s", index, sql);
    1009              : 
    1010         1018 :         rc = databasevm_prepare(data, sql, (void **)&table->col_value_stmt[index], DBFLAG_PERSISTENT);
    1011         1018 :         cloudsync_memory_free(sql);
    1012         1018 :         if (rc != DBRES_OK) goto error;
    1013         1018 :         if (!table->col_value_stmt[index]) goto error;
    1014         1018 :     }
    1015         1018 :     table->ncols += 1;
    1016              : 
    1017         1018 :     return 0;
    1018              : 
    1019              : error:
    1020              :     // clean up partially-initialized entry at index
    1021            0 :     if (table->col_name[index]) {cloudsync_memory_free(table->col_name[index]); table->col_name[index] = NULL;}
    1022            0 :     if (table->col_merge_stmt[index]) {databasevm_finalize(table->col_merge_stmt[index]); table->col_merge_stmt[index] = NULL;}
    1023            0 :     if (table->col_value_stmt[index]) {databasevm_finalize(table->col_value_stmt[index]); table->col_value_stmt[index] = NULL;}
    1024            0 :     return 1;
    1025         1018 : }
    1026              : 
    1027          240 : bool table_ensure_capacity (cloudsync_context *data) {
    1028          240 :     if (data->tables_count < data->tables_cap) return true;
    1029              :     
    1030            0 :     int new_cap = data->tables_cap ? data->tables_cap * 2 : CLOUDSYNC_INIT_NTABLES;
    1031            0 :     size_t bytes = (size_t)new_cap * sizeof(*data->tables);
    1032            0 :     void *p = cloudsync_memory_realloc(data->tables, bytes);
    1033            0 :     if (!p) return false;
    1034              :     
    1035            0 :     data->tables = (cloudsync_table_context **)p;
    1036            0 :     data->tables_cap = new_cap;
    1037            0 :     return true;
    1038          240 : }
    1039              : 
    1040          244 : bool table_add_to_context (cloudsync_context *data, table_algo algo, const char *table_name) {
    1041              :     DEBUG_DBFUNCTION("cloudsync_context_add_table %s", table_name);
    1042              : 
    1043              :     // Check if table already initialized in this connection's context.
    1044              :     // Note: This prevents same-connection duplicate initialization.
    1045              :     // SQLite clients cannot distinguish schemas, so having 'public.users'
    1046              :     // and 'auth.users' would cause sync ambiguity. Users should avoid
    1047              :     // initializing tables with the same name in different schemas.
    1048              :     // If two concurrent connections initialize tables with the same name
    1049              :     // in different schemas, the behavior is undefined.
    1050          244 :     cloudsync_table_context *table = table_lookup(data, table_name);
    1051          244 :     if (table) return true;
    1052              :     
    1053              :     // check for space availability
    1054          240 :     if (!table_ensure_capacity(data)) return false;
    1055              :     
    1056              :     // setup a new table
    1057          240 :     table = table_create(data, table_name, algo);
    1058          240 :     if (!table) return false;
    1059              :     
    1060              :     // fill remaining metadata in the table
    1061          240 :     int count = database_count_pk(data, table_name, false, table->schema);
    1062          240 :     if (count < 0) {cloudsync_set_dberror(data); goto abort_add_table;}
    1063          240 :     table->npks = count;
    1064          240 :     if (table->npks == 0) {
    1065              :         #if CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
    1066            0 :         goto abort_add_table;
    1067              :         #else
    1068              :         table->rowid_only = true;
    1069              :         table->npks = 1; // rowid
    1070              :         #endif
    1071              :     }
    1072              :     
    1073          240 :     int ncols = database_count_nonpk(data, table_name, table->schema);
    1074          240 :     if (ncols < 0) {cloudsync_set_dberror(data); goto abort_add_table;}
    1075          240 :     int rc = table_add_stmts(table, ncols);
    1076          240 :     if (rc != DBRES_OK) goto abort_add_table;
    1077              :     
    1078              :     // a table with only pk(s) is totally legal
    1079          240 :     if (ncols > 0) {
    1080          201 :         table->col_name = (char **)cloudsync_memory_alloc((uint64_t)(sizeof(char *) * ncols));
    1081          201 :         if (!table->col_name) goto abort_add_table;
    1082              :         
    1083          201 :         table->col_id = (int *)cloudsync_memory_alloc((uint64_t)(sizeof(int) * ncols));
    1084          201 :         if (!table->col_id) goto abort_add_table;
    1085              :         
    1086          201 :         table->col_merge_stmt = (dbvm_t **)cloudsync_memory_alloc((uint64_t)(sizeof(void *) * ncols));
    1087          201 :         if (!table->col_merge_stmt) goto abort_add_table;
    1088              :         
    1089          201 :         table->col_value_stmt = (dbvm_t **)cloudsync_memory_alloc((uint64_t)(sizeof(void *) * ncols));
    1090          201 :         if (!table->col_value_stmt) goto abort_add_table;
    1091              : 
    1092          201 :         table->col_algo = (col_algo_t *)cloudsync_memory_zeroalloc((uint64_t)(sizeof(col_algo_t) * ncols));
    1093          201 :         if (!table->col_algo) goto abort_add_table;
    1094              : 
    1095          201 :         table->col_delimiter = (char **)cloudsync_memory_zeroalloc((uint64_t)(sizeof(char *) * ncols));
    1096          201 :         if (!table->col_delimiter) goto abort_add_table;
    1097              : 
    1098              :         // Pass empty string when schema is NULL; SQL will fall back to current_schema()
    1099          201 :         const char *schema = table->schema ? table->schema : "";
    1100          402 :         char *sql = cloudsync_memory_mprintf(SQL_PRAGMA_TABLEINFO_LIST_NONPK_NAME_CID,
    1101          201 :                                               table_name, schema, table_name, schema);
    1102          201 :         if (!sql) goto abort_add_table;
    1103          201 :         rc = database_exec_callback(data, sql, table_add_to_context_cb, (void *)table);
    1104          201 :         cloudsync_memory_free(sql);
    1105          201 :         if (rc == DBRES_ABORT) goto abort_add_table;
    1106          201 :     }
    1107              :     
    1108              :     // append newly created table
    1109          240 :     data->tables[data->tables_count++] = table;
    1110          240 :     return true;
    1111              :     
    1112              : abort_add_table:
    1113            0 :     table_free(table);
    1114            0 :     return false;
    1115          244 : }
    1116              : 
    1117            0 : dbvm_t *cloudsync_colvalue_stmt (cloudsync_context *data, const char *tbl_name, bool *persistent) {
    1118            0 :     dbvm_t *vm = NULL;
    1119            0 :     *persistent = false;
    1120              : 
    1121            0 :     cloudsync_table_context *table = table_lookup(data, tbl_name);
    1122            0 :     if (table) {
    1123            0 :         char *col_name = NULL;
    1124            0 :         if (table->ncols > 0) {
    1125            0 :             col_name = table->col_name[0];
    1126              :             // retrieve col_value precompiled statement
    1127            0 :             vm = table_column_lookup(table, col_name, false, NULL);
    1128            0 :             *persistent = true;
    1129            0 :         } else {
    1130            0 :             char *sql = table_build_value_sql(table, "*");
    1131            0 :             databasevm_prepare(data, sql, (void **)&vm, 0);
    1132            0 :             cloudsync_memory_free(sql);
    1133            0 :             *persistent = false;
    1134              :         }
    1135            0 :     }
    1136              :     
    1137            0 :     return vm;
    1138              : }
    1139              : 
    1140         6806 : bool table_enabled (cloudsync_table_context *table) {
    1141         6806 :     return table->enabled;
    1142              : }
    1143              : 
    1144            6 : void table_set_enabled (cloudsync_table_context *table, bool value) {
    1145            6 :     table->enabled = value;
    1146            6 : }
    1147              : 
    1148        19484 : int table_count_cols (cloudsync_table_context *table) {
    1149        19484 :     return table->ncols;
    1150              : }
    1151              : 
    1152         6622 : int table_count_pks (cloudsync_table_context *table) {
    1153         6622 :     return table->npks;
    1154              : }
    1155              : 
    1156        32650 : const char *table_colname (cloudsync_table_context *table, int index) {
    1157        32650 :     return table->col_name[index];
    1158              : }
    1159              : 
    1160         3513 : bool table_pk_exists (cloudsync_table_context *table, const char *value, size_t len) {
    1161              :     // check if a row with the same primary key already exists
    1162              :     // if so, this means the row might have been previously deleted (sentinel)
    1163         3513 :     return (dbvm_count(table->meta_pkexists_stmt, value, len, DBTYPE_BLOB) > 0);
    1164              : }
    1165              : 
    1166            0 : char **table_pknames (cloudsync_table_context *table) {
    1167            0 :     return table->pk_name;
    1168              : }
    1169              : 
    1170           23 : void table_set_pknames (cloudsync_table_context *table, char **pknames) {
    1171           23 :     table_pknames_free(table->pk_name, table->npks);
    1172           23 :     table->pk_name = pknames;
    1173           23 : }
    1174              : 
    1175        49154 : bool table_algo_isgos (cloudsync_table_context *table) {
    1176        49154 :     return (table->algo == table_algo_crdt_gos);
    1177              : }
    1178              : 
    1179            0 : const char *table_schema (cloudsync_table_context *table) {
    1180            0 :     return table->schema;
    1181              : }
    1182              : 
    1183              : // MARK: - Merge Insert -
    1184              : 
    1185        46004 : int64_t merge_get_local_cl (cloudsync_table_context *table, const char *pk, int pklen) {
    1186        46004 :     dbvm_t *vm = table->meta_local_cl_stmt;
    1187        46004 :     int64_t result = -1;
    1188              :     
    1189        46004 :     int rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
    1190        46004 :     if (rc != DBRES_OK) goto cleanup;
    1191              :     
    1192        46004 :     rc = databasevm_bind_blob(vm, 2, (const void *)pk, pklen);
    1193        46004 :     if (rc != DBRES_OK) goto cleanup;
    1194              :     
    1195        46004 :     rc = databasevm_step(vm);
    1196        46004 :     if (rc == DBRES_ROW) result = database_column_int(vm, 0);
    1197            0 :     else if (rc == DBRES_DONE) result = 0;
    1198              :     
    1199              : cleanup:
    1200        46004 :     if (result == -1) cloudsync_set_dberror(table->context);
    1201        46004 :     dbvm_reset(vm);
    1202        46004 :     return result;
    1203              : }
    1204              : 
    1205        44985 : int merge_get_col_version (cloudsync_table_context *table, const char *col_name, const char *pk, int pklen, int64_t *version) {
    1206        44985 :     dbvm_t *vm = table->meta_col_version_stmt;
    1207              :     
    1208        44985 :     int rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
    1209        44985 :     if (rc != DBRES_OK) goto cleanup;
    1210              :     
    1211        44985 :     rc = databasevm_bind_text(vm, 2, col_name, -1);
    1212        44985 :     if (rc != DBRES_OK) goto cleanup;
    1213              :     
    1214        44985 :     rc = databasevm_step(vm);
    1215        69889 :     if (rc == DBRES_ROW) {
    1216        24904 :         *version = database_column_int(vm, 0);
    1217        24904 :         rc = DBRES_OK;
    1218        24904 :     }
    1219              :     
    1220              : cleanup:
    1221        44985 :     if ((rc != DBRES_OK) && (rc != DBRES_DONE)) cloudsync_set_dberror(table->context);
    1222        44985 :     dbvm_reset(vm);
    1223        44985 :     return rc;
    1224              : }
    1225              : 
    1226        25072 : int merge_set_winner_clock (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pk_len, const char *colname, int64_t col_version, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
    1227              :     
    1228              :     // get/set site_id
    1229        25072 :     dbvm_t *vm = data->getset_siteid_stmt;
    1230        25072 :     int rc = databasevm_bind_blob(vm, 1, (const void *)site_id, site_len);
    1231        25072 :     if (rc != DBRES_OK) goto cleanup_merge;
    1232              :     
    1233        25072 :     rc = databasevm_step(vm);
    1234        25072 :     if (rc != DBRES_ROW) goto cleanup_merge;
    1235              :     
    1236        25072 :     int64_t ord = database_column_int(vm, 0);
    1237        25072 :     dbvm_reset(vm);
    1238              :     
    1239        25072 :     vm = table->meta_winner_clock_stmt;
    1240        25072 :     rc = databasevm_bind_blob(vm, 1, (const void *)pk, pk_len);
    1241        25072 :     if (rc != DBRES_OK) goto cleanup_merge;
    1242              :     
    1243        25072 :     rc = databasevm_bind_text(vm, 2, (colname) ? colname : CLOUDSYNC_TOMBSTONE_VALUE, -1);
    1244        25072 :     if (rc != DBRES_OK) goto cleanup_merge;
    1245              :     
    1246        25072 :     rc = databasevm_bind_int(vm, 3, col_version);
    1247        25072 :     if (rc != DBRES_OK) goto cleanup_merge;
    1248              :     
    1249        25072 :     rc = databasevm_bind_int(vm, 4, db_version);
    1250        25072 :     if (rc != DBRES_OK) goto cleanup_merge;
    1251              :     
    1252        25072 :     rc = databasevm_bind_int(vm, 5, seq);
    1253        25072 :     if (rc != DBRES_OK) goto cleanup_merge;
    1254              :     
    1255        25072 :     rc = databasevm_bind_int(vm, 6, ord);
    1256        25072 :     if (rc != DBRES_OK) goto cleanup_merge;
    1257              :     
    1258        25072 :     rc = databasevm_step(vm);
    1259        50144 :     if (rc == DBRES_ROW) {
    1260        25072 :         *rowid = database_column_int(vm, 0);
    1261        25072 :         rc = DBRES_OK;
    1262        25072 :     }
    1263              :     
    1264              : cleanup_merge:
    1265        25072 :     if (rc != DBRES_OK) cloudsync_set_dberror(data);
    1266        25072 :     dbvm_reset(vm);
    1267        25072 :     return rc;
    1268              : }
    1269              : 
    1270              : // MARK: - Deferred column-batch merge functions -
    1271              : 
    1272        21278 : static int merge_pending_add (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, const char *col_name, dbvalue_t *col_value, int64_t col_version, int64_t db_version, const char *site_id, int site_len, int64_t seq) {
    1273        21278 :     merge_pending_batch *batch = data->pending_batch;
    1274              : 
    1275              :     // Store table and PK on first entry
    1276        21278 :     if (batch->table == NULL) {
    1277         7658 :         batch->table = table;
    1278         7658 :         batch->pk = (char *)cloudsync_memory_alloc(pklen);
    1279         7658 :         if (!batch->pk) return cloudsync_set_error(data, "merge_pending_add: out of memory for pk", DBRES_NOMEM);
    1280         7658 :         memcpy(batch->pk, pk, pklen);
    1281         7658 :         batch->pk_len = pklen;
    1282         7658 :     }
    1283              : 
    1284              :     // Ensure capacity
    1285        21278 :     if (batch->count >= batch->capacity) {
    1286          476 :         int new_cap = batch->capacity ? batch->capacity * 2 : 8;
    1287          476 :         merge_pending_entry *new_entries = (merge_pending_entry *)cloudsync_memory_realloc(batch->entries, new_cap * sizeof(merge_pending_entry));
    1288          476 :         if (!new_entries) return cloudsync_set_error(data, "merge_pending_add: out of memory for entries", DBRES_NOMEM);
    1289          476 :         batch->entries = new_entries;
    1290          476 :         batch->capacity = new_cap;
    1291          476 :     }
    1292              : 
    1293              :     // Resolve col_name to a stable pointer from the table context
    1294              :     // (the incoming col_name may point to VM-owned memory that gets freed on reset)
    1295        21278 :     int col_idx = -1;
    1296        21278 :     table_column_lookup(table, col_name, true, &col_idx);
    1297        21278 :     const char *stable_col_name = (col_idx >= 0) ? table_colname(table, col_idx) : NULL;
    1298        21278 :     if (!stable_col_name) return cloudsync_set_error(data, "merge_pending_add: column not found in table context", DBRES_ERROR);
    1299              : 
    1300        21278 :     merge_pending_entry *e = &batch->entries[batch->count];
    1301        21278 :     e->col_name = stable_col_name;
    1302        21278 :     e->col_value = col_value ? (dbvalue_t *)database_value_dup(col_value) : NULL;
    1303        21278 :     e->col_version = col_version;
    1304        21278 :     e->db_version = db_version;
    1305        21278 :     e->site_id_len = (site_len <= (int)sizeof(e->site_id)) ? site_len : (int)sizeof(e->site_id);
    1306        21278 :     memcpy(e->site_id, site_id, e->site_id_len);
    1307        21278 :     e->seq = seq;
    1308              : 
    1309        21278 :     batch->count++;
    1310        21278 :     return DBRES_OK;
    1311        21278 : }
    1312              : 
    1313        18801 : static void merge_pending_free_entries (merge_pending_batch *batch) {
    1314        18801 :     if (batch->entries) {
    1315        35474 :         for (int i = 0; i < batch->count; i++) {
    1316        21278 :             if (batch->entries[i].col_value) {
    1317        21278 :                 database_value_free(batch->entries[i].col_value);
    1318        21278 :                 batch->entries[i].col_value = NULL;
    1319        21278 :             }
    1320        21278 :         }
    1321        14196 :     }
    1322        18801 :     if (batch->pk) {
    1323         7688 :         cloudsync_memory_free(batch->pk);
    1324         7688 :         batch->pk = NULL;
    1325         7688 :     }
    1326        18801 :     batch->table = NULL;
    1327        18801 :     batch->pk_len = 0;
    1328        18801 :     batch->cl = 0;
    1329        18801 :     batch->sentinel_pending = false;
    1330        18801 :     batch->row_exists = false;
    1331        18801 :     batch->count = 0;
    1332        18801 : }
    1333              : 
    1334        18801 : static int merge_flush_pending (cloudsync_context *data) {
    1335        18801 :     merge_pending_batch *batch = data->pending_batch;
    1336        18801 :     if (!batch) return DBRES_OK;
    1337              : 
    1338        18801 :     int rc = DBRES_OK;
    1339        18801 :     bool flush_savepoint = false;
    1340              : 
    1341              :     // Nothing to write — handle sentinel-only case or skip
    1342        18801 :     if (batch->count == 0 && !(batch->sentinel_pending && batch->table)) {
    1343        11113 :         goto cleanup;
    1344              :     }
    1345              : 
    1346              :     // Wrap database operations in a savepoint so that on failure (e.g. RLS
    1347              :     // denial) the rollback properly releases all executor resources (open
    1348              :     // relations, snapshots, plan cache) acquired during the failed statement.
    1349         7688 :     flush_savepoint = (database_begin_savepoint(data, "merge_flush") == DBRES_OK);
    1350              : 
    1351         7688 :     if (batch->count == 0) {
    1352              :         // Sentinel with no winning columns (PK-only row)
    1353           30 :         dbvm_t *vm = batch->table->real_merge_sentinel_stmt;
    1354           30 :         rc = pk_decode_prikey(batch->pk, (size_t)batch->pk_len, pk_decode_bind_callback, vm);
    1355           30 :         if (rc < 0) {
    1356            0 :             cloudsync_set_dberror(data);
    1357            0 :             dbvm_reset(vm);
    1358            0 :             goto cleanup;
    1359              :         }
    1360           30 :         SYNCBIT_SET(data);
    1361           30 :         rc = databasevm_step(vm);
    1362           30 :         dbvm_reset(vm);
    1363           30 :         SYNCBIT_RESET(data);
    1364           30 :         if (rc == DBRES_DONE) rc = DBRES_OK;
    1365           30 :         if (rc != DBRES_OK) {
    1366            0 :             cloudsync_set_dberror(data);
    1367            0 :             goto cleanup;
    1368              :         }
    1369           30 :         goto cleanup;
    1370              :     }
    1371              : 
    1372              :     // Check if cached prepared statement can be reused
    1373         7658 :     cloudsync_table_context *table = batch->table;
    1374         7658 :     dbvm_t *vm = NULL;
    1375         7658 :     bool cache_hit = false;
    1376              : 
    1377        14543 :     if (batch->cached_vm &&
    1378         7182 :         batch->cached_row_exists == batch->row_exists &&
    1379         6885 :         batch->cached_col_count == batch->count) {
    1380         6864 :         cache_hit = true;
    1381        26550 :         for (int i = 0; i < batch->count; i++) {
    1382        19711 :             if (batch->cached_col_names[i] != batch->entries[i].col_name) {
    1383           25 :                 cache_hit = false;
    1384           25 :                 break;
    1385              :             }
    1386        19686 :         }
    1387         6864 :     }
    1388              : 
    1389         7658 :     if (cache_hit) {
    1390         6839 :         vm = batch->cached_vm;
    1391         6839 :         dbvm_reset(vm);
    1392         6839 :     } else {
    1393              :         // Invalidate old cache
    1394          819 :         if (batch->cached_vm) {
    1395          343 :             databasevm_finalize(batch->cached_vm);
    1396          343 :             batch->cached_vm = NULL;
    1397          343 :         }
    1398              : 
    1399              :         // Build multi-column SQL
    1400          819 :         const char **colnames = (const char **)cloudsync_memory_alloc(batch->count * sizeof(const char *));
    1401          819 :         if (!colnames) {
    1402            0 :             rc = cloudsync_set_error(data, "merge_flush_pending: out of memory", DBRES_NOMEM);
    1403            0 :             goto cleanup;
    1404              :         }
    1405         2411 :         for (int i = 0; i < batch->count; i++) {
    1406         1592 :             colnames[i] = batch->entries[i].col_name;
    1407         1592 :         }
    1408              : 
    1409          819 :         char *sql = batch->row_exists
    1410          406 :             ? sql_build_update_pk_and_multi_cols(data, table->name, colnames, batch->count, table->schema)
    1411          413 :             : sql_build_upsert_pk_and_multi_cols(data, table->name, colnames, batch->count, table->schema);
    1412          819 :         cloudsync_memory_free(colnames);
    1413              : 
    1414          819 :         if (!sql) {
    1415            0 :             rc = cloudsync_set_error(data, "merge_flush_pending: unable to build multi-column upsert SQL", DBRES_ERROR);
    1416            0 :             goto cleanup;
    1417              :         }
    1418              : 
    1419          819 :         rc = databasevm_prepare(data, sql, &vm, 0);
    1420          819 :         cloudsync_memory_free(sql);
    1421          819 :         if (rc != DBRES_OK) {
    1422            0 :             rc = cloudsync_set_error(data, "merge_flush_pending: unable to prepare statement", rc);
    1423            0 :             goto cleanup;
    1424              :         }
    1425              : 
    1426              :         // Update cache
    1427          819 :         batch->cached_vm = vm;
    1428          819 :         batch->cached_row_exists = batch->row_exists;
    1429          819 :         batch->cached_col_count = batch->count;
    1430              :         // Reallocate cached_col_names if needed
    1431          819 :         if (batch->cached_col_count > 0) {
    1432          819 :             const char **new_names = (const char **)cloudsync_memory_realloc(
    1433          819 :                 batch->cached_col_names, batch->count * sizeof(const char *));
    1434          819 :             if (new_names) {
    1435         2411 :                 for (int i = 0; i < batch->count; i++) {
    1436         1592 :                     new_names[i] = batch->entries[i].col_name;
    1437         1592 :                 }
    1438          819 :                 batch->cached_col_names = new_names;
    1439          819 :             }
    1440          819 :         }
    1441              :     }
    1442              : 
    1443              :     // Bind PKs (positions 1..npks)
    1444         7658 :     int npks = pk_decode_prikey(batch->pk, (size_t)batch->pk_len, pk_decode_bind_callback, vm);
    1445         7658 :     if (npks < 0) {
    1446            0 :         cloudsync_set_dberror(data);
    1447            0 :         dbvm_reset(vm);
    1448            0 :         rc = DBRES_ERROR;
    1449            0 :         goto cleanup;
    1450              :     }
    1451              : 
    1452              :     // Bind column values (positions npks+1..npks+count)
    1453        28936 :     for (int i = 0; i < batch->count; i++) {
    1454        21278 :         merge_pending_entry *e = &batch->entries[i];
    1455        21278 :         int bind_idx = npks + 1 + i;
    1456        21278 :         if (e->col_value) {
    1457        21278 :             rc = databasevm_bind_value(vm, bind_idx, e->col_value);
    1458        21278 :         } else {
    1459            0 :             rc = databasevm_bind_null(vm, bind_idx);
    1460              :         }
    1461        21278 :         if (rc != DBRES_OK) {
    1462            0 :             cloudsync_set_dberror(data);
    1463            0 :             dbvm_reset(vm);
    1464            0 :             goto cleanup;
    1465              :         }
    1466        21278 :     }
    1467              : 
    1468              :     // Execute with SYNCBIT and GOS handling
    1469         7658 :     if (table->algo == table_algo_crdt_gos) table->enabled = 0;
    1470         7658 :     SYNCBIT_SET(data);
    1471         7658 :     rc = databasevm_step(vm);
    1472         7658 :     dbvm_reset(vm);
    1473         7658 :     SYNCBIT_RESET(data);
    1474         7658 :     if (table->algo == table_algo_crdt_gos) table->enabled = 1;
    1475              : 
    1476         7658 :     if (rc != DBRES_DONE) {
    1477            3 :         cloudsync_set_dberror(data);
    1478            3 :         goto cleanup;
    1479              :     }
    1480         7655 :     rc = DBRES_OK;
    1481              : 
    1482              :     // Call merge_set_winner_clock for each buffered entry
    1483         7655 :     int64_t rowid = 0;
    1484        28925 :     for (int i = 0; i < batch->count; i++) {
    1485        21270 :         merge_pending_entry *e = &batch->entries[i];
    1486        42540 :         int clock_rc = merge_set_winner_clock(data, table, batch->pk, batch->pk_len,
    1487        21270 :                                                e->col_name, e->col_version, e->db_version,
    1488        21270 :                                                (const char *)e->site_id, e->site_id_len,
    1489        21270 :                                                e->seq, &rowid);
    1490        21270 :         if (clock_rc != DBRES_OK) {
    1491            0 :             rc = clock_rc;
    1492            0 :             goto cleanup;
    1493              :         }
    1494        28925 :     }
    1495              : 
    1496              : cleanup:
    1497        18801 :     merge_pending_free_entries(batch);
    1498        18801 :     if (flush_savepoint) {
    1499         7688 :         if (rc == DBRES_OK) database_commit_savepoint(data, "merge_flush");
    1500            3 :         else database_rollback_savepoint(data, "merge_flush");
    1501         7688 :     }
    1502        18801 :     return rc;
    1503        18801 : }
    1504              : 
    1505         3167 : int merge_insert_col (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *col_name, dbvalue_t *col_value, int64_t col_version, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
    1506              :     int index;
    1507         3167 :     dbvm_t *vm = table_column_lookup(table, col_name, true, &index);
    1508         3167 :     if (vm == NULL) return cloudsync_set_error(data, "Unable to retrieve column merge precompiled statement in merge_insert_col", DBRES_MISUSE);
    1509              :     
    1510              :     // INSERT INTO table (pk1, pk2, col_name) VALUES (?, ?, ?) ON CONFLICT DO UPDATE SET col_name=?;"
    1511              :     
    1512              :     // bind primary key(s)
    1513         3167 :     int rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
    1514         3167 :     if (rc < 0) {
    1515            0 :         cloudsync_set_dberror(data);
    1516            0 :         dbvm_reset(vm);
    1517            0 :         return rc;
    1518              :     }
    1519              :     
    1520              :     // bind value (always bind all expected parameters for correct prepared statement handling)
    1521         3167 :     if (col_value) {
    1522         3167 :         rc = databasevm_bind_value(vm, table->npks+1, col_value);
    1523         3167 :         if (rc == DBRES_OK) rc = databasevm_bind_value(vm, table->npks+2, col_value);
    1524         3167 :     } else {
    1525            0 :         rc = databasevm_bind_null(vm, table->npks+1);
    1526            0 :         if (rc == DBRES_OK) rc = databasevm_bind_null(vm, table->npks+2);
    1527              :     }
    1528         3167 :     if (rc != DBRES_OK) {
    1529            0 :         cloudsync_set_dberror(data);
    1530            0 :         dbvm_reset(vm);
    1531            0 :         return rc;
    1532              :     }
    1533              : 
    1534              :     // perform real operation and disable triggers
    1535              :     
    1536              :     // in case of GOS we reused the table->col_merge_stmt statement
    1537              :     // which looks like: INSERT INTO table (pk1, pk2, col_name) VALUES (?, ?, ?) ON CONFLICT DO UPDATE SET col_name=?;"
    1538              :     // but the UPDATE in the CONFLICT statement would return SQLITE_CONSTRAINT because the trigger raises the error
    1539              :     // the trick is to disable that trigger before executing the statement
    1540         3167 :     if (table->algo == table_algo_crdt_gos) table->enabled = 0;
    1541         3167 :     SYNCBIT_SET(data);
    1542         3167 :     rc = databasevm_step(vm);
    1543              :     DEBUG_MERGE("merge_insert(%02x%02x): %s (%d)", data->site_id[UUID_LEN-2], data->site_id[UUID_LEN-1], databasevm_sql(vm), rc);
    1544         3167 :     dbvm_reset(vm);
    1545         3167 :     SYNCBIT_RESET(data);
    1546         3167 :     if (table->algo == table_algo_crdt_gos) table->enabled = 1;
    1547              :     
    1548         3167 :     if (rc != DBRES_DONE) {
    1549            0 :         cloudsync_set_dberror(data);
    1550            0 :         return rc;
    1551              :     }
    1552              :     
    1553         3167 :     return merge_set_winner_clock(data, table, pk, pklen, col_name, col_version, db_version, site_id, site_len, seq, rowid);
    1554         3167 : }
    1555              : 
    1556          163 : int merge_delete (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, const char *colname, int64_t cl, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
    1557          163 :     int rc = DBRES_OK;
    1558              :     
    1559              :     // reset return value
    1560          163 :     *rowid = 0;
    1561              :     
    1562              :     // bind pk
    1563          163 :     dbvm_t *vm = table->real_merge_delete_stmt;
    1564          163 :     rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
    1565          163 :     if (rc < 0) {
    1566            0 :         rc = cloudsync_set_dberror(data);
    1567            0 :         dbvm_reset(vm);
    1568            0 :         return rc;
    1569              :     }
    1570              :     
    1571              :     // perform real operation and disable triggers
    1572          163 :     SYNCBIT_SET(data);
    1573          163 :     rc = databasevm_step(vm);
    1574              :     DEBUG_MERGE("merge_delete(%02x%02x): %s (%d)", data->site_id[UUID_LEN-2], data->site_id[UUID_LEN-1], databasevm_sql(vm), rc);
    1575          163 :     dbvm_reset(vm);
    1576          163 :     SYNCBIT_RESET(data);
    1577          163 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    1578          163 :     if (rc != DBRES_OK) {
    1579            0 :         cloudsync_set_dberror(data);
    1580            0 :         return rc;
    1581              :     }
    1582              :     
    1583          163 :     rc = merge_set_winner_clock(data, table, pk, pklen, colname, cl, db_version, site_id, site_len, seq, rowid);
    1584          163 :     if (rc != DBRES_OK) return rc;
    1585              :     
    1586              :     // drop clocks _after_ setting the winner clock so we don't lose track of the max db_version!!
    1587              :     // this must never come before `set_winner_clock`
    1588          163 :     vm = table->meta_merge_delete_drop;
    1589          163 :     rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
    1590          163 :     if (rc == DBRES_OK) rc = databasevm_step(vm);
    1591          163 :     dbvm_reset(vm);
    1592              :     
    1593          163 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    1594          163 :     if (rc != DBRES_OK) cloudsync_set_dberror(data);
    1595          163 :     return rc;
    1596          163 : }
    1597              : 
    1598           60 : int merge_zeroclock_on_resurrect(cloudsync_table_context *table, int64_t db_version, const char *pk, int pklen) {
    1599           60 :     dbvm_t *vm = table->meta_zero_clock_stmt;
    1600              :     
    1601           60 :     int rc = databasevm_bind_int(vm, 1, db_version);
    1602           60 :     if (rc != DBRES_OK) goto cleanup;
    1603              :     
    1604           60 :     rc = databasevm_bind_blob(vm, 2, (const void *)pk, pklen);
    1605           60 :     if (rc != DBRES_OK) goto cleanup;
    1606              :     
    1607           60 :     rc = databasevm_step(vm);
    1608           60 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    1609              :     
    1610              : cleanup:
    1611           60 :     if (rc != DBRES_OK) cloudsync_set_dberror(table->context);
    1612           60 :     dbvm_reset(vm);
    1613           60 :     return rc;
    1614              : }
    1615              : 
    1616              : // executed only if insert_cl == local_cl
    1617        44985 : int merge_did_cid_win (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, dbvalue_t *insert_value, const char *site_id, int site_len, const char *col_name, int64_t col_version, bool *didwin_flag) {
    1618              :     
    1619        44985 :     if (col_name == NULL) col_name = CLOUDSYNC_TOMBSTONE_VALUE;
    1620              :     
    1621              :     int64_t local_version;
    1622        44985 :     int rc = merge_get_col_version(table, col_name, pk, pklen, &local_version);
    1623        44985 :     if (rc == DBRES_DONE) {
    1624              :         // no rows returned, the incoming change wins if there's nothing there locally
    1625        20081 :         *didwin_flag = true;
    1626        20081 :         return DBRES_OK;
    1627              :     }
    1628        24904 :     if (rc != DBRES_OK) return rc;
    1629              :     
    1630              :     // rc == DBRES_OK, means that a row with a version exists
    1631        24904 :     if (local_version != col_version) {
    1632         1969 :         if (col_version > local_version) {*didwin_flag = true; return DBRES_OK;}
    1633          743 :         if (col_version < local_version) {*didwin_flag = false; return DBRES_OK;}
    1634            0 :     }
    1635              :     
    1636              :     // rc == DBRES_ROW and col_version == local_version, need to compare values
    1637              : 
    1638              :     // retrieve col_value precompiled statement
    1639        22935 :     bool is_block_col = block_is_block_colname(col_name) && table_has_block_cols(table);
    1640              :     dbvm_t *vm;
    1641        22935 :     if (is_block_col) {
    1642              :         // Block column: read value from blocks table (pk + col_name bindings)
    1643           43 :         vm = table_block_value_read_stmt(table);
    1644           43 :         if (!vm) return cloudsync_set_error(data, "Unable to retrieve block value read statement in merge_did_cid_win", DBRES_ERROR);
    1645           43 :         rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
    1646           43 :         if (rc != DBRES_OK) { dbvm_reset(vm); return cloudsync_set_dberror(data); }
    1647           43 :         rc = databasevm_bind_text(vm, 2, col_name, -1);
    1648           43 :         if (rc != DBRES_OK) { dbvm_reset(vm); return cloudsync_set_dberror(data); }
    1649           43 :     } else {
    1650        22892 :         vm = table_column_lookup(table, col_name, false, NULL);
    1651        22892 :         if (!vm) return cloudsync_set_error(data, "Unable to retrieve column value precompiled statement in merge_did_cid_win", DBRES_ERROR);
    1652              : 
    1653              :         // bind primary key values
    1654        22892 :         rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, (void *)vm);
    1655        22892 :         if (rc < 0) {
    1656            0 :             rc = cloudsync_set_dberror(data);
    1657            0 :             dbvm_reset(vm);
    1658            0 :             return rc;
    1659              :         }
    1660              :     }
    1661              :         
    1662              :     // execute vm
    1663              :     dbvalue_t *local_value;
    1664        22935 :     rc = databasevm_step(vm);
    1665        22935 :     if (rc == DBRES_DONE) {
    1666              :         // meta entry exists but the actual value is missing
    1667              :         // we should allow the value_compare function to make a decision
    1668              :         // value_compare has been modified to handle the case where lvalue is NULL
    1669            2 :         local_value = NULL;
    1670            2 :         rc = DBRES_OK;
    1671        22935 :     } else if (rc == DBRES_ROW) {
    1672        22933 :         local_value = database_column_value(vm, 0);
    1673        22933 :         rc = DBRES_OK;
    1674        22933 :     } else {
    1675            0 :         goto cleanup;
    1676              :     }
    1677              :     
    1678              :     // compare values
    1679        22935 :     int ret = dbutils_value_compare(insert_value, local_value);
    1680              :     // reset after compare, otherwise local value would be deallocated
    1681        22935 :     dbvm_reset(vm);
    1682        22935 :     vm = NULL;
    1683              :     
    1684        22935 :     bool compare_site_id = (ret == 0 && data->merge_equal_values == true);
    1685        22935 :     if (!compare_site_id) {
    1686        22935 :         *didwin_flag = (ret > 0);
    1687        22935 :         goto cleanup;
    1688              :     }
    1689              :     
    1690              :     // values are the same and merge_equal_values is true
    1691            0 :     vm = table->meta_site_id_stmt;
    1692            0 :     rc = databasevm_bind_blob(vm, 1, (const void *)pk, pklen);
    1693            0 :     if (rc != DBRES_OK) goto cleanup;
    1694              :     
    1695            0 :     rc = databasevm_bind_text(vm, 2, col_name, -1);
    1696            0 :     if (rc != DBRES_OK) goto cleanup;
    1697              :     
    1698            0 :     rc = databasevm_step(vm);
    1699            0 :     if (rc == DBRES_ROW) {
    1700            0 :         const void *local_site_id = database_column_blob(vm, 0, NULL);
    1701            0 :         if (!local_site_id) {
    1702            0 :             dbvm_reset(vm);
    1703            0 :             return cloudsync_set_error(data, "NULL site_id in cloudsync table, table is probably corrupted", DBRES_ERROR);
    1704              :         }
    1705            0 :         ret = memcmp(site_id, local_site_id, site_len);
    1706            0 :         *didwin_flag = (ret > 0);
    1707            0 :         dbvm_reset(vm);
    1708            0 :         return DBRES_OK;
    1709              :     }
    1710              :     
    1711              :     // handle error condition here
    1712            0 :     dbvm_reset(vm);
    1713            0 :     return cloudsync_set_error(data, "Unable to find site_id for previous change, cloudsync table is probably corrupted", DBRES_ERROR);
    1714              :     
    1715              : cleanup:
    1716        22935 :     if (rc != DBRES_OK) cloudsync_set_dberror(data);
    1717        22935 :     dbvm_reset(vm);
    1718        22935 :     return rc;
    1719        44985 : }
    1720              : 
    1721           60 : int merge_sentinel_only_insert (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, int64_t cl, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid) {
    1722              : 
    1723              :     // reset return value
    1724           60 :     *rowid = 0;
    1725              : 
    1726           60 :     if (data->pending_batch == NULL) {
    1727              :         // Immediate mode: execute base table INSERT
    1728            0 :         dbvm_t *vm = table->real_merge_sentinel_stmt;
    1729            0 :         int rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
    1730            0 :         if (rc < 0) {
    1731            0 :             rc = cloudsync_set_dberror(data);
    1732            0 :             dbvm_reset(vm);
    1733            0 :             return rc;
    1734              :         }
    1735              : 
    1736            0 :         SYNCBIT_SET(data);
    1737            0 :         rc = databasevm_step(vm);
    1738            0 :         dbvm_reset(vm);
    1739            0 :         SYNCBIT_RESET(data);
    1740            0 :         if (rc == DBRES_DONE) rc = DBRES_OK;
    1741            0 :         if (rc != DBRES_OK) {
    1742            0 :             cloudsync_set_dberror(data);
    1743            0 :             return rc;
    1744              :         }
    1745            0 :     } else {
    1746              :         // Batch mode: skip base table INSERT, the batch flush will create the row
    1747           60 :         merge_pending_batch *batch = data->pending_batch;
    1748           60 :         batch->sentinel_pending = true;
    1749           60 :         if (batch->table == NULL) {
    1750           30 :             batch->table = table;
    1751           30 :             batch->pk = (char *)cloudsync_memory_alloc(pklen);
    1752           30 :             if (!batch->pk) return cloudsync_set_error(data, "merge_sentinel_only_insert: out of memory for pk", DBRES_NOMEM);
    1753           30 :             memcpy(batch->pk, pk, pklen);
    1754           30 :             batch->pk_len = pklen;
    1755           30 :         }
    1756              :     }
    1757              : 
    1758              :     // Metadata operations always execute regardless of batch mode
    1759           60 :     int rc = merge_zeroclock_on_resurrect(table, db_version, pk, pklen);
    1760           60 :     if (rc != DBRES_OK) return rc;
    1761              : 
    1762           60 :     return merge_set_winner_clock(data, table, pk, pklen, NULL, cl, db_version, site_id, site_len, seq, rowid);
    1763           60 : }
    1764              : 
    1765              : // MARK: - Block-level merge helpers -
    1766              : 
    1767              : // Store a block value in the blocks table
    1768          379 : static int block_store_value (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *block_colname, dbvalue_t *col_value) {
    1769          379 :     dbvm_t *vm = table->block_value_write_stmt;
    1770          379 :     if (!vm) return cloudsync_set_error(data, "block_store_value: blocks table not initialized", DBRES_MISUSE);
    1771              : 
    1772          379 :     int rc = databasevm_bind_blob(vm, 1, pk, pklen);
    1773          379 :     if (rc != DBRES_OK) goto cleanup;
    1774          379 :     rc = databasevm_bind_text(vm, 2, block_colname, -1);
    1775          379 :     if (rc != DBRES_OK) goto cleanup;
    1776          379 :     if (col_value) {
    1777          379 :         rc = databasevm_bind_value(vm, 3, col_value);
    1778          379 :     } else {
    1779            0 :         rc = databasevm_bind_null(vm, 3);
    1780              :     }
    1781          379 :     if (rc != DBRES_OK) goto cleanup;
    1782              : 
    1783          379 :     rc = databasevm_step(vm);
    1784          379 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    1785              : 
    1786              : cleanup:
    1787          379 :     if (rc != DBRES_OK) cloudsync_set_dberror(data);
    1788          379 :     databasevm_reset(vm);
    1789          379 :     return rc;
    1790          379 : }
    1791              : 
    1792              : // Delete a block value from the blocks table
    1793           72 : static int block_delete_value (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *block_colname) {
    1794           72 :     dbvm_t *vm = table->block_value_delete_stmt;
    1795           72 :     if (!vm) return cloudsync_set_error(data, "block_delete_value: blocks table not initialized", DBRES_MISUSE);
    1796              : 
    1797           72 :     int rc = databasevm_bind_blob(vm, 1, pk, pklen);
    1798           72 :     if (rc != DBRES_OK) goto cleanup;
    1799           72 :     rc = databasevm_bind_text(vm, 2, block_colname, -1);
    1800           72 :     if (rc != DBRES_OK) goto cleanup;
    1801              : 
    1802           72 :     rc = databasevm_step(vm);
    1803           72 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    1804              : 
    1805              : cleanup:
    1806           72 :     if (rc != DBRES_OK) cloudsync_set_dberror(data);
    1807           72 :     databasevm_reset(vm);
    1808           72 :     return rc;
    1809           72 : }
    1810              : 
    1811              : // Materialize all alive blocks for a base column into the base table
    1812          462 : int block_materialize_column (cloudsync_context *data, cloudsync_table_context *table, const void *pk, int pklen, const char *base_col_name) {
    1813          462 :     if (!table->block_list_stmt) return cloudsync_set_error(data, "block_materialize_column: blocks table not initialized", DBRES_MISUSE);
    1814              : 
    1815              :     // Find column index and delimiter
    1816          462 :     int col_idx = -1;
    1817          468 :     for (int i = 0; i < table->ncols; i++) {
    1818          468 :         if (strcasecmp(table->col_name[i], base_col_name) == 0) {
    1819          462 :             col_idx = i;
    1820          462 :             break;
    1821              :         }
    1822            6 :     }
    1823          462 :     if (col_idx < 0) return cloudsync_set_error(data, "block_materialize_column: column not found", DBRES_ERROR);
    1824          462 :     const char *delimiter = table->col_delimiter[col_idx] ? table->col_delimiter[col_idx] : BLOCK_DEFAULT_DELIMITER;
    1825              : 
    1826              :     // Build the LIKE pattern for block col_names: "base_col\x1F%"
    1827          462 :     char *like_pattern = block_build_colname(base_col_name, "%");
    1828          462 :     if (!like_pattern) return DBRES_NOMEM;
    1829              : 
    1830              :     // Query alive blocks from blocks table joined with metadata
    1831              :     // block_list_stmt: SELECT b.col_value FROM blocks b JOIN meta m
    1832              :     //   ON b.pk = m.pk AND b.col_name = m.col_name
    1833              :     //   WHERE b.pk = ? AND b.col_name LIKE ? AND m.col_version % 2 = 1
    1834              :     //   ORDER BY b.col_name
    1835          462 :     dbvm_t *vm = table->block_list_stmt;
    1836          462 :     int rc = databasevm_bind_blob(vm, 1, pk, pklen);
    1837          462 :     if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
    1838          462 :     rc = databasevm_bind_text(vm, 2, like_pattern, -1);
    1839          462 :     if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
    1840              :     // Bind pk again for the join condition (parameter 3)
    1841          462 :     rc = databasevm_bind_blob(vm, 3, pk, pklen);
    1842          462 :     if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
    1843          462 :     rc = databasevm_bind_text(vm, 4, like_pattern, -1);
    1844          462 :     if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_reset(vm); return rc; }
    1845              : 
    1846              :     // Collect block values
    1847          462 :     const char **block_values = NULL;
    1848          462 :     int block_count = 0;
    1849          462 :     int block_cap = 0;
    1850              : 
    1851        22695 :     while ((rc = databasevm_step(vm)) == DBRES_ROW) {
    1852        22233 :         const char *value = database_column_text(vm, 0);
    1853        22233 :         if (block_count >= block_cap) {
    1854         1076 :             int new_cap = block_cap ? block_cap * 2 : 16;
    1855         1076 :             const char **new_arr = (const char **)cloudsync_memory_realloc((void *)block_values, (uint64_t)(new_cap * sizeof(char *)));
    1856         1076 :             if (!new_arr) { rc = DBRES_NOMEM; break; }
    1857         1076 :             block_values = new_arr;
    1858         1076 :             block_cap = new_cap;
    1859         1076 :         }
    1860        22233 :         block_values[block_count] = value ? cloudsync_string_dup(value) : cloudsync_string_dup("");
    1861        22233 :         block_count++;
    1862              :     }
    1863          462 :     databasevm_reset(vm);
    1864          462 :     cloudsync_memory_free(like_pattern);
    1865              : 
    1866          462 :     if (rc != DBRES_DONE && rc != DBRES_OK && rc != DBRES_ROW) {
    1867              :         // Free collected values
    1868            0 :         for (int i = 0; i < block_count; i++) cloudsync_memory_free((void *)block_values[i]);
    1869            0 :         if (block_values) cloudsync_memory_free((void *)block_values);
    1870            0 :         return cloudsync_set_dberror(data);
    1871              :     }
    1872              : 
    1873              :     // Materialize text (NULL when no alive blocks)
    1874          462 :     char *text = (block_count > 0) ? block_materialize_text(block_values, block_count, delimiter) : NULL;
    1875        22695 :     for (int i = 0; i < block_count; i++) cloudsync_memory_free((void *)block_values[i]);
    1876          462 :     if (block_values) cloudsync_memory_free((void *)block_values);
    1877          462 :     if (block_count > 0 && !text) return DBRES_NOMEM;
    1878              : 
    1879              :     // Update the base table column via the col_merge_stmt (with triggers disabled)
    1880          462 :     dbvm_t *merge_vm = table->col_merge_stmt[col_idx];
    1881          462 :     if (!merge_vm) { cloudsync_memory_free(text); return DBRES_ERROR; }
    1882              : 
    1883              :     // Bind PKs
    1884          462 :     rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, merge_vm);
    1885          462 :     if (rc < 0) { cloudsync_memory_free(text); databasevm_reset(merge_vm); return DBRES_ERROR; }
    1886              : 
    1887              :     // Bind the text value twice (INSERT value + ON CONFLICT UPDATE value)
    1888          462 :     int npks = table->npks;
    1889          462 :     if (text) {
    1890          458 :         rc = databasevm_bind_text(merge_vm, npks + 1, text, -1);
    1891          458 :         if (rc != DBRES_OK) { cloudsync_memory_free(text); databasevm_reset(merge_vm); return rc; }
    1892          458 :         rc = databasevm_bind_text(merge_vm, npks + 2, text, -1);
    1893          458 :         if (rc != DBRES_OK) { cloudsync_memory_free(text); databasevm_reset(merge_vm); return rc; }
    1894          458 :     } else {
    1895            4 :         rc = databasevm_bind_null(merge_vm, npks + 1);
    1896            4 :         if (rc != DBRES_OK) { databasevm_reset(merge_vm); return rc; }
    1897            4 :         rc = databasevm_bind_null(merge_vm, npks + 2);
    1898            4 :         if (rc != DBRES_OK) { databasevm_reset(merge_vm); return rc; }
    1899              :     }
    1900              : 
    1901              :     // Execute with triggers disabled
    1902          462 :     table->enabled = 0;
    1903          462 :     SYNCBIT_SET(data);
    1904          462 :     rc = databasevm_step(merge_vm);
    1905          462 :     databasevm_reset(merge_vm);
    1906          462 :     SYNCBIT_RESET(data);
    1907          462 :     table->enabled = 1;
    1908              : 
    1909          462 :     cloudsync_memory_free(text);
    1910              : 
    1911          462 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    1912          462 :     if (rc != DBRES_OK) return cloudsync_set_dberror(data);
    1913          462 :     return DBRES_OK;
    1914          462 : }
    1915              : 
    1916              : // Accessor for has_block_cols flag
    1917         1024 : bool table_has_block_cols (cloudsync_table_context *table) {
    1918         1024 :     return table && table->has_block_cols;
    1919              : }
    1920              : 
    1921              : // Get block column algo for a given column index
    1922        11382 : col_algo_t table_col_algo (cloudsync_table_context *table, int index) {
    1923        11382 :     if (!table || !table->col_algo || index < 0 || index >= table->ncols) return col_algo_normal;
    1924        11382 :     return table->col_algo[index];
    1925        11382 : }
    1926              : 
    1927              : // Get block delimiter for a given column index
    1928          133 : const char *table_col_delimiter (cloudsync_table_context *table, int index) {
    1929          133 :     if (!table || !table->col_delimiter || index < 0 || index >= table->ncols) return BLOCK_DEFAULT_DELIMITER;
    1930          133 :     return table->col_delimiter[index] ? table->col_delimiter[index] : BLOCK_DEFAULT_DELIMITER;
    1931          133 : }
    1932              : 
    1933              : // Block column struct accessors (for use outside cloudsync.c where struct is opaque)
    1934         1024 : dbvm_t *table_block_value_read_stmt (cloudsync_table_context *table) { return table ? table->block_value_read_stmt : NULL; }
    1935          496 : dbvm_t *table_block_value_write_stmt (cloudsync_table_context *table) { return table ? table->block_value_write_stmt : NULL; }
    1936           91 : dbvm_t *table_block_list_stmt (cloudsync_table_context *table) { return table ? table->block_list_stmt : NULL; }
    1937           91 : const char *table_blocks_ref (cloudsync_table_context *table) { return table ? table->blocks_ref : NULL; }
    1938              : 
    1939            2 : void table_set_col_delimiter (cloudsync_table_context *table, int col_idx, const char *delimiter) {
    1940            2 :     if (!table || !table->col_delimiter || col_idx < 0 || col_idx >= table->ncols) return;
    1941            2 :     if (table->col_delimiter[col_idx]) cloudsync_memory_free(table->col_delimiter[col_idx]);
    1942            2 :     table->col_delimiter[col_idx] = delimiter ? cloudsync_string_dup(delimiter) : NULL;
    1943            2 : }
    1944              : 
    1945              : // Find column index by name
    1946          121 : int table_col_index (cloudsync_table_context *table, const char *col_name) {
    1947          121 :     if (!table || !col_name) return -1;
    1948          125 :     for (int i = 0; i < table->ncols; i++) {
    1949          125 :         if (strcasecmp(table->col_name[i], col_name) == 0) return i;
    1950            4 :     }
    1951            0 :     return -1;
    1952          121 : }
    1953              : 
    1954        46004 : int merge_insert (cloudsync_context *data, cloudsync_table_context *table, const char *insert_pk, int insert_pk_len, int64_t insert_cl, const char *insert_name, dbvalue_t *insert_value, int64_t insert_col_version, int64_t insert_db_version, const char *insert_site_id, int insert_site_id_len, int64_t insert_seq, int64_t *rowid) {
    1955              :     // Handle DWS and AWS algorithms here
    1956              :     // Delete-Wins Set (DWS): table_algo_crdt_dws
    1957              :     // Add-Wins Set (AWS): table_algo_crdt_aws
    1958              :     
    1959              :     // Causal-Length Set (CLS) Algorithm (default)
    1960              :     
    1961              :     // compute the local causal length for the row based on the primary key
    1962              :     // the causal length is used to determine the order of operations and resolve conflicts.
    1963        46004 :     int64_t local_cl = merge_get_local_cl(table, insert_pk, insert_pk_len);
    1964        46004 :     if (local_cl < 0) return cloudsync_set_error(data, "Unable to compute local causal length", DBRES_ERROR);
    1965              :     
    1966              :     // if the incoming causal length is older than the local causal length, we can safely ignore it
    1967              :     // because the local changes are more recent
    1968        46004 :     if (insert_cl < local_cl) return DBRES_OK;
    1969              :     
    1970              :     // check if the operation is a delete by examining the causal length
    1971              :     // even causal lengths typically signify delete operations
    1972        45792 :     bool is_delete = (insert_cl % 2 == 0);
    1973        45792 :     if (is_delete) {
    1974              :         // if it's a delete, check if the local state is at the same causal length
    1975              :         // if it is, no further action is needed
    1976          614 :         if (local_cl == insert_cl) return DBRES_OK;
    1977              :         
    1978              :         // perform a delete merge if the causal length is newer than the local one
    1979          326 :         int rc = merge_delete(data, table, insert_pk, insert_pk_len, insert_name, insert_col_version,
    1980          163 :                               insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
    1981          163 :         if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_delete", rc);
    1982          163 :         return rc;
    1983              :     }
    1984              :     
    1985              :     // if the operation is a sentinel-only insert (indicating a new row or resurrected row with no column update), handle it separately.
    1986        45178 :     bool is_sentinel_only = (strcmp(insert_name, CLOUDSYNC_TOMBSTONE_VALUE) == 0);
    1987        45178 :     if (is_sentinel_only) {
    1988          193 :         if (local_cl == insert_cl) return DBRES_OK;
    1989              :         
    1990              :         // perform a sentinel-only insert to track the existence of the row
    1991          120 :         int rc = merge_sentinel_only_insert(data, table, insert_pk, insert_pk_len, insert_col_version,
    1992           60 :                                             insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
    1993           60 :         if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_sentinel_only_insert", rc);
    1994           60 :         return rc;
    1995              :     }
    1996              :     
    1997              :     // from this point I can be sure that insert_name is not sentinel
    1998              :     
    1999              :     // handle the case where a row is being resurrected (e.g., after a delete, a new insert for the same row)
    2000              :     // odd causal lengths can "resurrect" rows
    2001        44985 :     bool needs_resurrect = (insert_cl > local_cl && insert_cl % 2 == 1);
    2002        44985 :     bool row_exists_locally = local_cl != 0;
    2003              :    
    2004              :     // if a resurrection is needed, insert a sentinel to mark the row as alive
    2005              :     // this handles out-of-order deliveries where the row was deleted and is now being re-inserted
    2006        44985 :     if (needs_resurrect && (row_exists_locally || (!row_exists_locally && insert_cl > 1))) {
    2007            0 :         int rc = merge_sentinel_only_insert(data, table, insert_pk, insert_pk_len, insert_cl,
    2008            0 :                                             insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
    2009            0 :         if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to perform merge_sentinel_only_insert", rc);
    2010            0 :     }
    2011              :     
    2012              :     // at this point, we determine whether the incoming change wins based on causal length
    2013              :     // this can be due to a resurrection, a non-existent local row, or a conflict resolution
    2014        44985 :     bool flag = false;
    2015        44985 :     int rc = merge_did_cid_win(data, table, insert_pk, insert_pk_len, insert_value, insert_site_id, insert_site_id_len, insert_name, insert_col_version, &flag);
    2016        44985 :     if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to perform merge_did_cid_win", rc);
    2017              :     
    2018              :     // check if the incoming change wins and should be applied
    2019        44985 :     bool does_cid_win = ((needs_resurrect) || (!row_exists_locally) || (flag));
    2020        44985 :     if (!does_cid_win) return DBRES_OK;
    2021              : 
    2022              :     // Block-level LWW: if the incoming col_name is a block entry (contains \x1F),
    2023              :     // bypass the normal base-table write and instead store the value in the blocks table.
    2024              :     // The base table column will be materialized from all alive blocks.
    2025        21707 :     if (block_is_block_colname(insert_name) && table->has_block_cols) {
    2026              :         // Store or delete block value in blocks table depending on tombstone status
    2027          412 :         if (insert_col_version % 2 == 0) {
    2028              :             // Tombstone: remove from blocks table
    2029           33 :             rc = block_delete_value(data, table, insert_pk, insert_pk_len, insert_name);
    2030           33 :         } else {
    2031          379 :             rc = block_store_value(data, table, insert_pk, insert_pk_len, insert_name, insert_value);
    2032              :         }
    2033          412 :         if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to store/delete block value", rc);
    2034              : 
    2035              :         // Set winner clock in metadata
    2036          824 :         rc = merge_set_winner_clock(data, table, insert_pk, insert_pk_len, insert_name,
    2037          412 :                                     insert_col_version, insert_db_version,
    2038          412 :                                     insert_site_id, insert_site_id_len, insert_seq, rowid);
    2039          412 :         if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to set winner clock for block", rc);
    2040              : 
    2041              :         // Materialize the full column from blocks into the base table
    2042          412 :         char *base_col = block_extract_base_colname(insert_name);
    2043          412 :         if (base_col) {
    2044          412 :             rc = block_materialize_column(data, table, insert_pk, insert_pk_len, base_col);
    2045          412 :             cloudsync_memory_free(base_col);
    2046          412 :             if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to materialize block column", rc);
    2047          412 :         }
    2048              : 
    2049          412 :         return DBRES_OK;
    2050              :     }
    2051              : 
    2052              :     // perform the final column insert or update if the incoming change wins
    2053        21295 :     if (data->pending_batch) {
    2054              :         // Propagate row_exists_locally to the batch on the first winning column.
    2055              :         // This lets merge_flush_pending choose UPDATE vs INSERT ON CONFLICT,
    2056              :         // which matters when RLS policies reference columns not in the payload.
    2057        21278 :         if (data->pending_batch->table == NULL) {
    2058         7658 :             data->pending_batch->row_exists = row_exists_locally;
    2059         7658 :         }
    2060        21278 :         rc = merge_pending_add(data, table, insert_pk, insert_pk_len, insert_name, insert_value, insert_col_version, insert_db_version, insert_site_id, insert_site_id_len, insert_seq);
    2061        21278 :         if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_pending_add", rc);
    2062        21278 :     } else {
    2063           17 :         rc = merge_insert_col(data, table, insert_pk, insert_pk_len, insert_name, insert_value, insert_col_version, insert_db_version, insert_site_id, insert_site_id_len, insert_seq, rowid);
    2064           17 :         if (rc != DBRES_OK) cloudsync_set_error(data, "Unable to perform merge_insert_col", rc);
    2065              :     }
    2066              : 
    2067        21295 :     return rc;
    2068        46004 : }
    2069              : 
    2070              : // MARK: - Block column setup -
    2071              : 
    2072           69 : int cloudsync_setup_block_column (cloudsync_context *data, const char *table_name, const char *col_name, const char *delimiter) {
    2073           69 :     cloudsync_table_context *table = table_lookup(data, table_name);
    2074           69 :     if (!table) return cloudsync_set_error(data, "cloudsync_setup_block_column: table not found", DBRES_ERROR);
    2075              : 
    2076              :     // Find column index
    2077           69 :     int col_idx = table_col_index(table, col_name);
    2078           69 :     if (col_idx < 0) {
    2079              :         char buf[1024];
    2080            0 :         snprintf(buf, sizeof(buf), "cloudsync_setup_block_column: column '%s' not found in table '%s'", col_name, table_name);
    2081            0 :         return cloudsync_set_error(data, buf, DBRES_ERROR);
    2082              :     }
    2083              : 
    2084              :     // Set column algo
    2085           69 :     table->col_algo[col_idx] = col_algo_block;
    2086           69 :     table->has_block_cols = true;
    2087              : 
    2088              :     // Set delimiter (can be NULL for default)
    2089           69 :     if (table->col_delimiter[col_idx]) {
    2090            0 :         cloudsync_memory_free(table->col_delimiter[col_idx]);
    2091            0 :         table->col_delimiter[col_idx] = NULL;
    2092            0 :     }
    2093           69 :     if (delimiter) {
    2094            0 :         table->col_delimiter[col_idx] = cloudsync_string_dup(delimiter);
    2095            0 :     }
    2096              : 
    2097              :     // Create blocks table if not already done
    2098           69 :     if (!table->blocks_ref) {
    2099           67 :         table->blocks_ref = database_build_blocks_ref(table->schema, table->name);
    2100           67 :         if (!table->blocks_ref) return DBRES_NOMEM;
    2101              : 
    2102              :         // CREATE TABLE IF NOT EXISTS
    2103           67 :         char *sql = cloudsync_memory_mprintf(SQL_BLOCKS_CREATE_TABLE, table->blocks_ref);
    2104           67 :         if (!sql) return DBRES_NOMEM;
    2105              : 
    2106           67 :         int rc = database_exec(data, sql);
    2107           67 :         cloudsync_memory_free(sql);
    2108           67 :         if (rc != DBRES_OK) return cloudsync_set_error(data, "Unable to create blocks table", rc);
    2109              : 
    2110              :         // Prepare block statements
    2111              :         // Write: upsert into blocks (pk, col_name, col_value)
    2112           67 :         sql = cloudsync_memory_mprintf(SQL_BLOCKS_UPSERT, table->blocks_ref);
    2113           67 :         if (!sql) return DBRES_NOMEM;
    2114           67 :         rc = databasevm_prepare(data, sql, (void **)&table->block_value_write_stmt, DBFLAG_PERSISTENT);
    2115           67 :         cloudsync_memory_free(sql);
    2116           67 :         if (rc != DBRES_OK) return rc;
    2117              : 
    2118              :         // Read: SELECT col_value FROM blocks WHERE pk = ? AND col_name = ?
    2119           67 :         sql = cloudsync_memory_mprintf(SQL_BLOCKS_SELECT, table->blocks_ref);
    2120           67 :         if (!sql) return DBRES_NOMEM;
    2121           67 :         rc = databasevm_prepare(data, sql, (void **)&table->block_value_read_stmt, DBFLAG_PERSISTENT);
    2122           67 :         cloudsync_memory_free(sql);
    2123           67 :         if (rc != DBRES_OK) return rc;
    2124              : 
    2125              :         // Delete: DELETE FROM blocks WHERE pk = ? AND col_name = ?
    2126           67 :         sql = cloudsync_memory_mprintf(SQL_BLOCKS_DELETE, table->blocks_ref);
    2127           67 :         if (!sql) return DBRES_NOMEM;
    2128           67 :         rc = databasevm_prepare(data, sql, (void **)&table->block_value_delete_stmt, DBFLAG_PERSISTENT);
    2129           67 :         cloudsync_memory_free(sql);
    2130           67 :         if (rc != DBRES_OK) return rc;
    2131              : 
    2132              :         // List alive blocks for materialization
    2133           67 :         sql = cloudsync_memory_mprintf(SQL_BLOCKS_LIST_ALIVE, table->blocks_ref, table->meta_ref);
    2134           67 :         if (!sql) return DBRES_NOMEM;
    2135           67 :         rc = databasevm_prepare(data, sql, (void **)&table->block_list_stmt, DBFLAG_PERSISTENT);
    2136           67 :         cloudsync_memory_free(sql);
    2137           67 :         if (rc != DBRES_OK) return rc;
    2138           67 :     }
    2139              : 
    2140              :     // Persist settings
    2141           69 :     int rc = dbutils_table_settings_set_key_value(data, table_name, col_name, "algo", "block");
    2142           69 :     if (rc != DBRES_OK) return rc;
    2143              : 
    2144           69 :     if (delimiter) {
    2145            0 :         rc = dbutils_table_settings_set_key_value(data, table_name, col_name, "delimiter", delimiter);
    2146            0 :         if (rc != DBRES_OK) return rc;
    2147            0 :     }
    2148              : 
    2149           69 :     return DBRES_OK;
    2150           69 : }
    2151              : 
    2152              : // MARK: - Private -
    2153              : 
    2154          193 : bool cloudsync_config_exists (cloudsync_context *data) {
    2155          193 :     return database_internal_table_exists(data, CLOUDSYNC_SITEID_NAME) == true;
    2156              : }
    2157              : 
    2158          207 : cloudsync_context *cloudsync_context_create (void *db) {
    2159          207 :     cloudsync_context *data = (cloudsync_context *)cloudsync_memory_zeroalloc((uint64_t)(sizeof(cloudsync_context)));
    2160          207 :     if (!data) return NULL;
    2161              :     DEBUG_SETTINGS("cloudsync_context_create %p", data);
    2162              :     
    2163          207 :     data->libversion = CLOUDSYNC_VERSION;
    2164          207 :     data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
    2165              :     #if CLOUDSYNC_DEBUG
    2166              :     data->debug = 1;
    2167              :     #endif
    2168              :     
    2169              :     // allocate space for 64 tables (it can grow if needed)
    2170          207 :     uint64_t mem_needed = (uint64_t)(CLOUDSYNC_INIT_NTABLES * sizeof(cloudsync_table_context *));
    2171          207 :     data->tables = (cloudsync_table_context **)cloudsync_memory_zeroalloc(mem_needed);
    2172          207 :     if (!data->tables) {cloudsync_memory_free(data); return NULL;}
    2173              :     
    2174          207 :     data->tables_cap = CLOUDSYNC_INIT_NTABLES;
    2175          207 :     data->tables_count = 0;
    2176          207 :     data->db = db;
    2177              :     
    2178              :     // SQLite exposes col_value as ANY, but other databases require a concrete type.
    2179              :     // In PostgreSQL we expose col_value as bytea, which holds the pk-encoded value bytes (type + data).
    2180              :     // Because col_value is already encoded, we skip decoding this field and pass it through as bytea.
    2181              :     // It is decoded to the target column type just before applying changes to the base table.
    2182          207 :     data->skip_decode_idx = (db == NULL) ? CLOUDSYNC_PK_INDEX_COLVALUE : -1;
    2183              : 
    2184          207 :     return data;
    2185          207 : }
    2186              : 
    2187          207 : void cloudsync_context_free (void *ctx) {
    2188          207 :     cloudsync_context *data = (cloudsync_context *)ctx;
    2189              :     DEBUG_SETTINGS("cloudsync_context_free %p", data);
    2190          207 :     if (!data) return;
    2191              : 
    2192              :     // free all table contexts and prepared statements
    2193          207 :     cloudsync_terminate(data);
    2194              : 
    2195          207 :     cloudsync_memory_free(data->tables);
    2196          207 :     cloudsync_memory_free(data);
    2197          207 : }
    2198              : 
    2199          295 : const char *cloudsync_context_init (cloudsync_context *data) {
    2200          295 :     if (!data) return NULL;
    2201              :     
    2202              :     // perform init just the first time, if the site_id field is not set.
    2203              :     // The data->site_id value could exists while settings tables don't exists if the
    2204              :     // cloudsync_context_init was previously called in init transaction that was rolled back
    2205              :     // because of an error during the init process.
    2206          295 :     if (data->site_id[0] == 0 || !database_internal_table_exists(data, CLOUDSYNC_SITEID_NAME)) {
    2207          187 :         if (dbutils_settings_init(data) != DBRES_OK) return NULL;
    2208          187 :         if (cloudsync_add_dbvms(data) != DBRES_OK) return NULL;
    2209          187 :         if (cloudsync_load_siteid(data) != DBRES_OK) return NULL;
    2210          187 :         data->schema_hash = database_schema_hash(data);
    2211          187 :     }
    2212              :     
    2213          295 :     return (const char *)data->site_id;
    2214          295 : }
    2215              : 
    2216         1235 : void cloudsync_sync_key (cloudsync_context *data, const char *key, const char *value) {
    2217              :     DEBUG_SETTINGS("cloudsync_sync_key key: %s value: %s", key, value);
    2218              :     
    2219              :     // sync data
    2220         1235 :     if (strcmp(key, CLOUDSYNC_KEY_SCHEMAVERSION) == 0) {
    2221          187 :         data->schema_version = (int)strtol(value, NULL, 0);
    2222          187 :         return;
    2223              :     }
    2224              :     
    2225         1048 :     if (strcmp(key, CLOUDSYNC_KEY_DEBUG) == 0) {
    2226            0 :         data->debug = 0;
    2227            0 :         if (value && (value[0] != 0) && (value[0] != '0')) data->debug = 1;
    2228            0 :         return;
    2229              :     }
    2230              : 
    2231         1048 :     if (strcmp(key, CLOUDSYNC_KEY_SCHEMA) == 0) {
    2232            0 :         cloudsync_set_schema(data, value);
    2233            0 :         return;
    2234              :     }
    2235         1235 : }
    2236              : 
    2237              : #if 0
    2238              : void cloudsync_sync_table_key(cloudsync_context *data, const char *table, const char *column, const char *key, const char *value) {
    2239              :     DEBUG_SETTINGS("cloudsync_sync_table_key table: %s column: %s key: %s value: %s", table, column, key, value);
    2240              :     // Unused in this version
    2241              :     return;
    2242              : }
    2243              : #endif
    2244              : 
    2245         7686 : int cloudsync_commit_hook (void *ctx) {
    2246         7686 :     cloudsync_context *data = (cloudsync_context *)ctx;
    2247              :     
    2248         7686 :     data->db_version = data->pending_db_version;
    2249         7686 :     data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
    2250         7686 :     data->seq = 0;
    2251              :     
    2252         7686 :     return DBRES_OK;
    2253              : }
    2254              : 
    2255            3 : void cloudsync_rollback_hook (void *ctx) {
    2256            3 :     cloudsync_context *data = (cloudsync_context *)ctx;
    2257              :     
    2258            3 :     data->pending_db_version = CLOUDSYNC_VALUE_NOTSET;
    2259            3 :     data->seq = 0;
    2260            3 : }
    2261              : 
    2262           24 : int cloudsync_begin_alter (cloudsync_context *data, const char *table_name) {
    2263              :     // init cloudsync_settings
    2264           24 :     if (cloudsync_context_init(data) == NULL) {
    2265            0 :         return DBRES_MISUSE;
    2266              :     }
    2267              :     
    2268              :     // lookup table
    2269           24 :     cloudsync_table_context *table = table_lookup(data, table_name);
    2270           24 :     if (!table) {
    2271              :         char buffer[1024];
    2272            1 :         snprintf(buffer, sizeof(buffer), "Unable to find table %s", table_name);
    2273            1 :         return cloudsync_set_error(data, buffer, DBRES_MISUSE);
    2274              :     }
    2275              :     
    2276              :     // create a savepoint to manage the alter operations as a transaction
    2277           23 :     int rc = database_begin_savepoint(data, "cloudsync_alter");
    2278           23 :     if (rc != DBRES_OK) {
    2279            0 :         return cloudsync_set_error(data, "Unable to create cloudsync_begin_alter savepoint", DBRES_MISUSE);
    2280              :     }
    2281              :     
    2282              :     // retrieve primary key(s)
    2283           23 :     char **names = NULL;
    2284           23 :     int nrows = 0;
    2285           23 :     rc = database_pk_names(data, table_name, &names, &nrows);
    2286           23 :     if (rc != DBRES_OK) {
    2287              :         char buffer[1024];
    2288            0 :         snprintf(buffer, sizeof(buffer), "Unable to get primary keys for table %s", table_name);
    2289            0 :         cloudsync_set_error(data, buffer, DBRES_MISUSE);
    2290            0 :         goto rollback_begin_alter;
    2291              :     }
    2292              :     
    2293              :     // sanity check the number of primary keys
    2294           23 :     if (nrows != table_count_pks(table)) {
    2295              :         char buffer[1024];
    2296            0 :         snprintf(buffer, sizeof(buffer), "Number of primary keys for table %s changed before ALTER", table_name);
    2297            0 :         cloudsync_set_error(data, buffer, DBRES_MISUSE);
    2298            0 :         goto rollback_begin_alter;
    2299              :     }
    2300              :     
    2301              :     // drop original triggers
    2302           23 :     rc = database_delete_triggers(data, table_name);
    2303           23 :     if (rc != DBRES_OK) {
    2304              :         char buffer[1024];
    2305            0 :         snprintf(buffer, sizeof(buffer), "Unable to delete triggers for table %s in cloudsync_begin_alter.", table_name);
    2306            0 :         cloudsync_set_error(data, buffer, DBRES_ERROR);
    2307            0 :         goto rollback_begin_alter;
    2308              :     }
    2309              :     
    2310           23 :     table_set_pknames(table, names);
    2311           23 :     return DBRES_OK;
    2312              :     
    2313              : rollback_begin_alter:
    2314            0 :     database_rollback_savepoint(data, "cloudsync_alter");
    2315            0 :     if (names) table_pknames_free(names, nrows);
    2316            0 :     return rc;
    2317           24 : }
    2318              : 
    2319           23 : int cloudsync_finalize_alter (cloudsync_context *data, cloudsync_table_context *table) {
    2320              :     // check if dbversion needed to be updated
    2321           23 :     cloudsync_dbversion_check_uptodate(data);
    2322              : 
    2323              :     // if primary-key columns change, all row identities change.
    2324              :     // In that case, the clock table must be dropped, recreated,
    2325              :     // and backfilled. We detect this by comparing the unique index
    2326              :     // in the lookaside table with the source table's PKs.
    2327              :     
    2328              :     // retrieve primary keys (to check is they changed)
    2329           23 :     char **result = NULL;
    2330           23 :     int nrows = 0;
    2331           23 :     int rc = database_pk_names (data, table->name, &result, &nrows);
    2332           23 :     if (rc != DBRES_OK || nrows == 0) {
    2333            0 :         if (nrows == 0) rc = DBRES_MISUSE;
    2334            0 :         goto finalize;
    2335              :     }
    2336              :     
    2337              :     // check if there are differences
    2338           23 :     bool pk_diff = (nrows != table->npks);
    2339           23 :     if (!pk_diff) {
    2340           45 :         for (int i = 0; i < nrows; ++i) {
    2341           34 :             if (strcmp(table->pk_name[i], result[i]) != 0) {
    2342            6 :                 pk_diff = true;
    2343            6 :                 break;
    2344              :             }
    2345           28 :         }
    2346           17 :     }
    2347              :     
    2348           23 :     if (pk_diff) {
    2349              :         // drop meta-table, it will be recreated
    2350           12 :         char *sql = cloudsync_memory_mprintf(SQL_DROP_CLOUDSYNC_TABLE, table->meta_ref);
    2351           12 :         rc = database_exec(data, sql);
    2352           12 :         cloudsync_memory_free(sql);
    2353           12 :         if (rc != DBRES_OK) {
    2354            0 :             DEBUG_DBERROR(rc, "cloudsync_finalize_alter", data);
    2355            0 :             goto finalize;
    2356              :         }
    2357           12 :     } else {
    2358              :         // compact meta-table
    2359              :         // delete entries for removed columns
    2360           11 :         const char *schema = table->schema ? table->schema : "";
    2361           11 :         char *sql = sql_build_delete_cols_not_in_schema_query(schema, table->name, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
    2362           11 :         rc = database_exec(data, sql);
    2363           11 :         cloudsync_memory_free(sql);
    2364           11 :         if (rc != DBRES_OK) {
    2365            0 :             DEBUG_DBERROR(rc, "cloudsync_finalize_alter", data);
    2366            0 :             goto finalize;
    2367              :         }
    2368              :         
    2369           11 :         sql = sql_build_pk_qualified_collist_query(schema, table->name);
    2370           11 :         if (!sql) {rc = DBRES_NOMEM; goto finalize;}
    2371              :         
    2372           11 :         char *pkclause = NULL;
    2373           11 :         rc = database_select_text(data, sql, &pkclause);
    2374           11 :         cloudsync_memory_free(sql);
    2375           11 :         if (rc != DBRES_OK) goto finalize;
    2376           11 :         char *pkvalues = (pkclause) ? pkclause : "rowid";
    2377              :         
    2378              :         // delete entries related to rows that no longer exist in the original table, but preserve tombstone
    2379           11 :         sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_GC_DELETE_ORPHANED_PK, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE, CLOUDSYNC_TOMBSTONE_VALUE, table->base_ref, table->meta_ref, pkvalues);
    2380           11 :         rc = database_exec(data, sql);
    2381           11 :         if (pkclause) cloudsync_memory_free(pkclause);
    2382           11 :         cloudsync_memory_free(sql);
    2383           11 :         if (rc != DBRES_OK) {
    2384            0 :             DEBUG_DBERROR(rc, "cloudsync_finalize_alter", data);
    2385            0 :             goto finalize;
    2386              :         }
    2387              : 
    2388              :     }
    2389              :     
    2390              :     // update key to be later used in cloudsync_dbversion_rebuild
    2391              :     char buf[256];
    2392           23 :     snprintf(buf, sizeof(buf), "%" PRId64, data->db_version);
    2393           23 :     dbutils_settings_set_key_value(data, "pre_alter_dbversion", buf);
    2394              :     
    2395              : finalize:
    2396           23 :     table_pknames_free(result, nrows);
    2397           23 :     return rc;
    2398              : }
    2399              : 
    2400           24 : int cloudsync_commit_alter (cloudsync_context *data, const char *table_name) {
    2401           24 :     int rc = DBRES_MISUSE;
    2402           24 :     cloudsync_table_context *table = NULL;
    2403              :     
    2404              :     // init cloudsync_settings
    2405           24 :     if (cloudsync_context_init(data) == NULL) {
    2406            0 :         cloudsync_set_error(data, "Unable to initialize cloudsync context", DBRES_MISUSE);
    2407            0 :         goto rollback_finalize_alter;
    2408              :     }
    2409              :     
    2410              :     // lookup table
    2411           24 :     table = table_lookup(data, table_name);
    2412           24 :     if (!table) {
    2413              :         char buffer[1024];
    2414            1 :         snprintf(buffer, sizeof(buffer), "Unable to find table %s", table_name);
    2415            1 :         cloudsync_set_error(data, buffer, DBRES_MISUSE);
    2416            1 :         goto rollback_finalize_alter;
    2417              :     }
    2418              :     
    2419           23 :     rc = cloudsync_finalize_alter(data, table);
    2420           23 :     if (rc != DBRES_OK) goto rollback_finalize_alter;
    2421              :     
    2422              :     // the table is outdated, delete it and it will be reloaded in the cloudsync_init_internal
    2423           23 :     table_remove(data, table);
    2424           23 :     table_free(table);
    2425           23 :     table = NULL;
    2426              :         
    2427              :     // init again cloudsync for the table
    2428           23 :     table_algo algo_current = dbutils_table_settings_get_algo(data, table_name);
    2429           23 :     if (algo_current == table_algo_none) algo_current = dbutils_table_settings_get_algo(data, "*");
    2430           23 :     rc = cloudsync_init_table(data, table_name, cloudsync_algo_name(algo_current), true);
    2431           23 :     if (rc != DBRES_OK) goto rollback_finalize_alter;
    2432              : 
    2433              :     // release savepoint
    2434           23 :     rc = database_commit_savepoint(data, "cloudsync_alter");
    2435           23 :     if (rc != DBRES_OK) {
    2436            0 :         cloudsync_set_dberror(data);
    2437            0 :         goto rollback_finalize_alter;
    2438              :     }
    2439              :     
    2440           23 :     cloudsync_update_schema_hash(data);
    2441           23 :     return DBRES_OK;
    2442              :     
    2443              : rollback_finalize_alter:
    2444            1 :     database_rollback_savepoint(data, "cloudsync_alter");
    2445            1 :     if (table) table_set_pknames(table, NULL);
    2446            1 :     return rc;
    2447           24 : }
    2448              : 
    2449              : // MARK: - Filter Rewrite -
    2450              : 
    2451              : // Replace bare column names in a filter expression with prefix-qualified names.
    2452              : // E.g., filter="user_id = 42", prefix="NEW", columns=["user_id","id"] → "NEW.\"user_id\" = 42"
    2453              : // Columns must be sorted by length descending by the caller to avoid partial matches.
    2454              : // Skips content inside single-quoted string literals.
    2455              : // Returns a newly allocated string (caller must free with cloudsync_memory_free), or NULL on error.
    2456              : // Helper: check if an identifier token matches a column name.
    2457            8 : static bool filter_is_column (const char *token, size_t token_len, char **columns, int ncols) {
    2458           28 :     for (int i = 0; i < ncols; ++i) {
    2459           24 :         if (strlen(columns[i]) == token_len && strncmp(token, columns[i], token_len) == 0)
    2460            4 :             return true;
    2461           20 :     }
    2462            4 :     return false;
    2463            8 : }
    2464              : 
    2465              : // Helper: check if character is part of a SQL identifier.
    2466           56 : static bool filter_is_ident_char (char c) {
    2467           92 :     return (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') ||
    2468           36 :            (c >= '0' && c <= '9') || c == '_';
    2469              : }
    2470              : 
    2471            4 : char *cloudsync_filter_add_row_prefix (const char *filter, const char *prefix, char **columns, int ncols) {
    2472            4 :     if (!filter || !prefix || !columns || ncols <= 0) return NULL;
    2473              : 
    2474            4 :     size_t filter_len = strlen(filter);
    2475            4 :     size_t prefix_len = strlen(prefix);
    2476              : 
    2477              :     // Each identifier match grows by at most (prefix_len + 3) bytes.
    2478              :     // Worst case: the entire filter is one repeated column reference separated by
    2479              :     // single characters, so up to (filter_len / 2) matches.  Use a safe upper bound.
    2480            4 :     size_t max_growth = (filter_len / 2 + 1) * (prefix_len + 3);
    2481            4 :     size_t cap = filter_len + max_growth + 64;
    2482            4 :     char *result = (char *)cloudsync_memory_alloc(cap);
    2483            4 :     if (!result) return NULL;
    2484            4 :     size_t out = 0;
    2485              : 
    2486              :     // Single pass: tokenize into identifiers, quoted strings, and everything else.
    2487            4 :     size_t i = 0;
    2488           24 :     while (i < filter_len) {
    2489              :         // Skip single-quoted string literals verbatim (handle '' escape)
    2490           20 :         if (filter[i] == '\'') {
    2491            0 :             result[out++] = filter[i++];
    2492            0 :             while (i < filter_len) {
    2493            0 :                 if (filter[i] == '\'') {
    2494            0 :                     result[out++] = filter[i++];
    2495              :                     // '' is an escaped quote — keep going
    2496            0 :                     if (i < filter_len && filter[i] == '\'') {
    2497            0 :                         result[out++] = filter[i++];
    2498            0 :                         continue;
    2499              :                     }
    2500            0 :                     break; // single ' ends the literal
    2501              :                 }
    2502            0 :                 result[out++] = filter[i++];
    2503              :             }
    2504            0 :             continue;
    2505              :         }
    2506              : 
    2507              :         // Extract identifier token
    2508           20 :         if (filter_is_ident_char(filter[i])) {
    2509            8 :             size_t start = i;
    2510           40 :             while (i < filter_len && filter_is_ident_char(filter[i])) ++i;
    2511            8 :             size_t token_len = i - start;
    2512              : 
    2513            8 :             if (filter_is_column(&filter[start], token_len, columns, ncols)) {
    2514              :                 // Emit PREFIX."column_name"
    2515            4 :                 memcpy(&result[out], prefix, prefix_len); out += prefix_len;
    2516            4 :                 result[out++] = '.';
    2517            4 :                 result[out++] = '"';
    2518            4 :                 memcpy(&result[out], &filter[start], token_len); out += token_len;
    2519            4 :                 result[out++] = '"';
    2520            4 :             } else {
    2521              :                 // Not a column — copy as-is
    2522            4 :                 memcpy(&result[out], &filter[start], token_len); out += token_len;
    2523              :             }
    2524            8 :             continue;
    2525              :         }
    2526              : 
    2527              :         // Any other character — copy as-is
    2528           12 :         result[out++] = filter[i++];
    2529              :     }
    2530              : 
    2531            4 :     result[out] = '\0';
    2532            4 :     return result;
    2533            4 : }
    2534              : 
    2535          243 : int cloudsync_refill_metatable (cloudsync_context *data, const char *table_name) {
    2536          243 :     cloudsync_table_context *table = table_lookup(data, table_name);
    2537          243 :     if (!table) return DBRES_ERROR;
    2538              : 
    2539          243 :     dbvm_t *vm = NULL;
    2540          243 :     int64_t db_version = cloudsync_dbversion_next(data, CLOUDSYNC_VALUE_NOTSET);
    2541              : 
    2542              :     // Read row-level filter from settings (if any)
    2543              :     char filter_buf[2048];
    2544          243 :     int frc = dbutils_table_settings_get_value(data, table_name, "*", "filter", filter_buf, sizeof(filter_buf));
    2545          243 :     const char *filter = (frc == DBRES_OK && filter_buf[0]) ? filter_buf : NULL;
    2546              : 
    2547          243 :     const char *schema = table->schema ? table->schema : "";
    2548          243 :     char *sql = sql_build_pk_collist_query(schema, table_name);
    2549          243 :     char *pkclause_identifiers = NULL;
    2550          243 :     int rc = database_select_text(data, sql, &pkclause_identifiers);
    2551          243 :     cloudsync_memory_free(sql);
    2552          243 :     if (rc != DBRES_OK) goto finalize;
    2553          243 :     char *pkvalues_identifiers = (pkclause_identifiers) ? pkclause_identifiers : "rowid";
    2554              : 
    2555              :     // Use database-specific query builder to handle type differences in composite PKs
    2556          243 :     sql = sql_build_insert_missing_pks_query(schema, table_name, pkvalues_identifiers, table->base_ref, table->meta_ref, filter);
    2557          243 :     if (!sql) {rc = DBRES_NOMEM; goto finalize;}
    2558          243 :     rc = database_exec(data, sql);
    2559          243 :     cloudsync_memory_free(sql);
    2560          243 :     if (rc != DBRES_OK) goto finalize;
    2561              : 
    2562              :     // fill missing colums
    2563              :     // for each non-pk column:
    2564              :     // The new query does 1 encode per source row and one indexed NOT-EXISTS probe.
    2565              :     // The old plan does many decodes per candidate and can't use an index to rule out matches quickly—so it burns CPU and I/O.
    2566              : 
    2567          243 :     if (filter) {
    2568            0 :         sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_PKS_NOT_IN_SYNC_FOR_COL_FILTERED, pkvalues_identifiers, table->base_ref, filter, table->meta_ref);
    2569            0 :     } else {
    2570          243 :         sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_PKS_NOT_IN_SYNC_FOR_COL, pkvalues_identifiers, table->base_ref, table->meta_ref);
    2571              :     }
    2572          243 :     rc = databasevm_prepare(data, sql, (void **)&vm, DBFLAG_PERSISTENT);
    2573          243 :     cloudsync_memory_free(sql);
    2574          243 :     if (rc != DBRES_OK) goto finalize;
    2575              :      
    2576         1270 :     for (int i=0; i<table->ncols; ++i) {
    2577         1027 :         char *col_name = table->col_name[i];
    2578              : 
    2579         1027 :         rc = databasevm_bind_text(vm, 1, col_name, -1);
    2580         1027 :         if (rc != DBRES_OK) goto finalize;
    2581              : 
    2582         1065 :         while (1) {
    2583         1065 :             rc = databasevm_step(vm);
    2584         1065 :             if (rc == DBRES_ROW) {
    2585           38 :                 size_t pklen = 0;
    2586           38 :                 const void *pk = (const char *)database_column_blob(vm, 0, &pklen);
    2587           38 :                 if (!pk) { rc = DBRES_ERROR; break; }
    2588           38 :                 rc = local_mark_insert_or_update_meta(table, pk, pklen, col_name, db_version, cloudsync_bumpseq(data));
    2589         1065 :             } else if (rc == DBRES_DONE) {
    2590         1027 :                 rc = DBRES_OK;
    2591         1027 :                 break;
    2592              :             } else {
    2593            0 :                 break;
    2594              :             }
    2595              :         }
    2596         1027 :         if (rc != DBRES_OK) goto finalize;
    2597              : 
    2598         1027 :         databasevm_reset(vm);
    2599         1270 :     }
    2600              :     
    2601              : finalize:
    2602          243 :     if (rc != DBRES_OK) {DEBUG_ALWAYS("cloudsync_refill_metatable error: %s", database_errmsg(data));}
    2603          243 :     if (pkclause_identifiers) cloudsync_memory_free(pkclause_identifiers);
    2604          243 :     if (vm) databasevm_finalize(vm);
    2605          243 :     return rc;
    2606          243 : }
    2607              : 
    2608              : // MARK: - Local -
    2609              : 
    2610            3 : int local_update_sentinel (cloudsync_table_context *table, const void *pk, size_t pklen, int64_t db_version, int seq) {
    2611            3 :     dbvm_t *vm = table->meta_sentinel_update_stmt;
    2612            3 :     if (!vm) return -1;
    2613              :     
    2614            3 :     int rc = databasevm_bind_int(vm, 1, db_version);
    2615            3 :     if (rc != DBRES_OK) goto cleanup;
    2616              :     
    2617            3 :     rc = databasevm_bind_int(vm, 2, seq);
    2618            3 :     if (rc != DBRES_OK) goto cleanup;
    2619              :     
    2620            3 :     rc = databasevm_bind_blob(vm, 3, pk, (int)pklen);
    2621            3 :     if (rc != DBRES_OK) goto cleanup;
    2622              :     
    2623            3 :     rc = databasevm_step(vm);
    2624            3 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    2625              :     
    2626              : cleanup:
    2627            3 :     DEBUG_DBERROR(rc, "local_update_sentinel", table->context);
    2628            3 :     databasevm_reset(vm);
    2629            3 :     return rc;
    2630            3 : }
    2631              : 
    2632          128 : int local_mark_insert_sentinel_meta (cloudsync_table_context *table, const void *pk, size_t pklen, int64_t db_version, int seq) {
    2633          128 :     dbvm_t *vm = table->meta_sentinel_insert_stmt;
    2634          128 :     if (!vm) return -1;
    2635              :     
    2636          128 :     int rc = databasevm_bind_blob(vm, 1, pk, (int)pklen);
    2637          128 :     if (rc != DBRES_OK) goto cleanup;
    2638              :     
    2639          128 :     rc = databasevm_bind_int(vm, 2, db_version);
    2640          128 :     if (rc != DBRES_OK) goto cleanup;
    2641              :     
    2642          128 :     rc = databasevm_bind_int(vm, 3, seq);
    2643          128 :     if (rc != DBRES_OK) goto cleanup;
    2644              :     
    2645          128 :     rc = databasevm_bind_int(vm, 4, db_version);
    2646          128 :     if (rc != DBRES_OK) goto cleanup;
    2647              :     
    2648          128 :     rc = databasevm_bind_int(vm, 5, seq);
    2649          128 :     if (rc != DBRES_OK) goto cleanup;
    2650              :     
    2651          128 :     rc = databasevm_step(vm);
    2652          128 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    2653              :     
    2654              : cleanup:
    2655          128 :     DEBUG_DBERROR(rc, "local_insert_sentinel", table->context);
    2656          128 :     databasevm_reset(vm);
    2657          128 :     return rc;
    2658          128 : }
    2659              : 
    2660        11742 : int local_mark_insert_or_update_meta_impl (cloudsync_table_context *table, const void *pk, size_t pklen, const char *col_name, int col_version, int64_t db_version, int seq) {
    2661              :     
    2662        11742 :     dbvm_t *vm = table->meta_row_insert_update_stmt;
    2663        11742 :     if (!vm) return -1;
    2664              :     
    2665        11742 :     int rc = databasevm_bind_blob(vm, 1, pk, pklen);
    2666        11742 :     if (rc != DBRES_OK) goto cleanup;
    2667              :     
    2668        11742 :     rc = databasevm_bind_text(vm, 2, (col_name) ? col_name : CLOUDSYNC_TOMBSTONE_VALUE, -1);
    2669        11742 :     if (rc != DBRES_OK) goto cleanup;
    2670              :     
    2671        11742 :     rc = databasevm_bind_int(vm, 3, col_version);
    2672        11742 :     if (rc != DBRES_OK) goto cleanup;
    2673              :     
    2674        11742 :     rc = databasevm_bind_int(vm, 4, db_version);
    2675        11742 :     if (rc != DBRES_OK) goto cleanup;
    2676              :     
    2677        11742 :     rc = databasevm_bind_int(vm, 5, seq);
    2678        11742 :     if (rc != DBRES_OK) goto cleanup;
    2679              :     
    2680        11742 :     rc = databasevm_bind_int(vm, 6, db_version);
    2681        11742 :     if (rc != DBRES_OK) goto cleanup;
    2682              :     
    2683        11742 :     rc = databasevm_bind_int(vm, 7, seq);
    2684        11742 :     if (rc != DBRES_OK) goto cleanup;
    2685              :     
    2686        11742 :     rc = databasevm_step(vm);
    2687        11742 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    2688              :     
    2689              : cleanup:
    2690        11742 :     DEBUG_DBERROR(rc, "local_insert_or_update", table->context);
    2691        11742 :     databasevm_reset(vm);
    2692        11742 :     return rc;
    2693        11742 : }
    2694              : 
    2695        11638 : int local_mark_insert_or_update_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const char *col_name, int64_t db_version, int seq) {
    2696        11638 :     return local_mark_insert_or_update_meta_impl(table, pk, pklen, col_name, 1, db_version, seq);
    2697              : }
    2698              : 
    2699           39 : int local_mark_delete_block_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const char *block_colname, int64_t db_version, int seq) {
    2700              :     // Mark a block as deleted by setting col_version = 2 (even = deleted)
    2701           39 :     return local_mark_insert_or_update_meta_impl(table, pk, pklen, block_colname, 2, db_version, seq);
    2702              : }
    2703              : 
    2704           39 : int block_delete_value_external (cloudsync_context *data, cloudsync_table_context *table, const void *pk, size_t pklen, const char *block_colname) {
    2705           39 :     return block_delete_value(data, table, pk, (int)pklen, block_colname);
    2706              : }
    2707              : 
    2708           65 : int local_mark_delete_meta (cloudsync_table_context *table, const void *pk, size_t pklen, int64_t db_version, int seq) {
    2709           65 :     return local_mark_insert_or_update_meta_impl(table, pk, pklen, NULL, 2, db_version, seq);
    2710              : }
    2711              : 
    2712           34 : int local_drop_meta (cloudsync_table_context *table, const void *pk, size_t pklen) {
    2713           34 :     dbvm_t *vm = table->meta_row_drop_stmt;
    2714           34 :     if (!vm) return -1;
    2715              :     
    2716           34 :     int rc = databasevm_bind_blob(vm, 1, pk, pklen);
    2717           34 :     if (rc != DBRES_OK) goto cleanup;
    2718              :     
    2719           34 :     rc = databasevm_step(vm);
    2720           34 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    2721              :     
    2722              : cleanup:
    2723           34 :     DEBUG_DBERROR(rc, "local_drop_meta", table->context);
    2724           34 :     databasevm_reset(vm);
    2725           34 :     return rc;
    2726           34 : }
    2727              : 
    2728           31 : int local_update_move_meta (cloudsync_table_context *table, const void *pk, size_t pklen, const void *pk2, size_t pklen2, int64_t db_version) {
    2729              :     /*
    2730              :       * This function moves non-sentinel metadata entries from an old primary key (OLD.pk)
    2731              :       * to a new primary key (NEW.pk) when a primary key change occurs.
    2732              :       *
    2733              :       * To ensure consistency and proper conflict resolution in a CRDT (Conflict-free Replicated Data Type) system,
    2734              :       * each non-sentinel metadata entry involved in the move must have a unique sequence value (seq).
    2735              :       *
    2736              :       * The `seq` is crucial for tracking the order of operations and for detecting and resolving conflicts
    2737              :       * during synchronization between replicas. Without a unique `seq` for each entry, concurrent updates
    2738              :       * may be applied incorrectly, leading to data inconsistency.
    2739              :       *
    2740              :       * When performing the update, a unique `seq` must be assigned to each metadata row. This can be achieved
    2741              :       * by either incrementing the maximum sequence value in the table or using a function (e.g., cloudsync_bumpseq(data))
    2742              :       * that generates a unique sequence for each row. The update query should ensure that each row moved
    2743              :       * from OLD.pk to NEW.pk gets a distinct `seq` to maintain proper versioning and ordering of changes.
    2744              :      */
    2745              :     
    2746              :     // see https://github.com/sqliteai/sqlite-sync/blob/main/docs/PriKey.md for more details
    2747              :     // pk2 is the old pk
    2748              :     
    2749           31 :     dbvm_t *vm = table->meta_update_move_stmt;
    2750           31 :     if (!vm) return -1;
    2751              :     
    2752              :     // new primary key
    2753           31 :     int rc = databasevm_bind_blob(vm, 1, pk, pklen);
    2754           31 :     if (rc != DBRES_OK) goto cleanup;
    2755              :     
    2756              :     // new db_version
    2757           31 :     rc = databasevm_bind_int(vm, 2, db_version);
    2758           31 :     if (rc != DBRES_OK) goto cleanup;
    2759              :     
    2760              :     // old primary key
    2761           31 :     rc = databasevm_bind_blob(vm, 3, pk2, pklen2);
    2762           31 :     if (rc != DBRES_OK) goto cleanup;
    2763              :     
    2764           31 :     rc = databasevm_step(vm);
    2765           31 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    2766              :     
    2767              : cleanup:
    2768           31 :     DEBUG_DBERROR(rc, "local_update_move_meta", table->context);
    2769           31 :     databasevm_reset(vm);
    2770           31 :     return rc;
    2771           31 : }
    2772              : 
    2773              : // MARK: - Payload Encode / Decode -
    2774              : 
    2775          649 : static void cloudsync_payload_checksum_store (cloudsync_payload_header *header, uint64_t checksum) {
    2776          649 :     uint64_t h = checksum & 0xFFFFFFFFFFFFULL; // keep 48 bits
    2777          649 :     header->checksum[0] = (uint8_t)(h >> 40);
    2778          649 :     header->checksum[1] = (uint8_t)(h >> 32);
    2779          649 :     header->checksum[2] = (uint8_t)(h >> 24);
    2780          649 :     header->checksum[3] = (uint8_t)(h >> 16);
    2781          649 :     header->checksum[4] = (uint8_t)(h >>  8);
    2782          649 :     header->checksum[5] = (uint8_t)(h >>  0);
    2783          649 : }
    2784              : 
    2785          643 : static uint64_t cloudsync_payload_checksum_load (cloudsync_payload_header *header) {
    2786         1929 :     return ((uint64_t)header->checksum[0] << 40) |
    2787         1286 :            ((uint64_t)header->checksum[1] << 32) |
    2788         1286 :            ((uint64_t)header->checksum[2] << 24) |
    2789         1286 :            ((uint64_t)header->checksum[3] << 16) |
    2790         1286 :            ((uint64_t)header->checksum[4] <<  8) |
    2791          643 :            ((uint64_t)header->checksum[5] <<  0);
    2792              : }
    2793              : 
    2794          643 : static bool cloudsync_payload_checksum_verify (cloudsync_payload_header *header, uint64_t checksum) {
    2795          643 :     uint64_t checksum1 = cloudsync_payload_checksum_load(header);
    2796          643 :     uint64_t checksum2 = checksum & 0xFFFFFFFFFFFFULL;
    2797          643 :     return (checksum1 == checksum2);
    2798              : }
    2799              : 
    2800        49096 : static bool cloudsync_payload_encode_check (cloudsync_payload_context *payload, size_t needed) {
    2801        49096 :     if (payload->nrows == 0) needed += sizeof(cloudsync_payload_header);
    2802              :     
    2803              :     // alloc/resize buffer
    2804        49096 :     if (payload->bused + needed > payload->balloc) {
    2805          658 :         if (needed < CLOUDSYNC_PAYLOAD_MINBUF_SIZE) needed = CLOUDSYNC_PAYLOAD_MINBUF_SIZE;
    2806          658 :         size_t balloc = payload->balloc + needed;
    2807              :         
    2808          658 :         char *buffer = cloudsync_memory_realloc(payload->buffer, balloc);
    2809          658 :         if (!buffer) {
    2810            0 :             if (payload->buffer) cloudsync_memory_free(payload->buffer);
    2811            0 :             memset(payload, 0, sizeof(cloudsync_payload_context));
    2812            0 :             return false;
    2813              :         }
    2814              :         
    2815          658 :         payload->buffer = buffer;
    2816          658 :         payload->balloc = balloc;
    2817          658 :         if (payload->nrows == 0) payload->bused = sizeof(cloudsync_payload_header);
    2818          658 :     }
    2819              :     
    2820        49096 :     return true;
    2821        49096 : }
    2822              : 
    2823        50398 : size_t cloudsync_payload_context_size (size_t *header_size) {
    2824        50398 :     if (header_size) *header_size = sizeof(cloudsync_payload_header);
    2825        50398 :     return sizeof(cloudsync_payload_context);
    2826              : }
    2827              : 
    2828          649 : void cloudsync_payload_header_init (cloudsync_payload_header *header, uint32_t expanded_size, uint16_t ncols, uint32_t nrows, uint64_t hash) {
    2829          649 :     memset(header, 0, sizeof(cloudsync_payload_header));
    2830              :     assert(sizeof(cloudsync_payload_header)==32);
    2831              :     
    2832              :     int major, minor, patch;
    2833          649 :     sscanf(CLOUDSYNC_VERSION, "%d.%d.%d", &major, &minor, &patch);
    2834              :     
    2835          649 :     header->signature = htonl(CLOUDSYNC_PAYLOAD_SIGNATURE);
    2836          649 :     header->version = CLOUDSYNC_PAYLOAD_VERSION_2;
    2837          649 :     header->libversion[0] = (uint8_t)major;
    2838          649 :     header->libversion[1] = (uint8_t)minor;
    2839          649 :     header->libversion[2] = (uint8_t)patch;
    2840          649 :     header->expanded_size = htonl(expanded_size);
    2841          649 :     header->ncols = htons(ncols);
    2842          649 :     header->nrows = htonl(nrows);
    2843          649 :     header->schema_hash = htonll(hash);
    2844          649 : }
    2845              : 
    2846        49096 : int cloudsync_payload_encode_step (cloudsync_payload_context *payload, cloudsync_context *data, int argc, dbvalue_t **argv) {
    2847              :     DEBUG_FUNCTION("cloudsync_payload_encode_step");
    2848              :     // debug_values(argc, argv);
    2849              :     
    2850              :     // check if the step function is called for the first time
    2851        49096 :     if (payload->nrows == 0) payload->ncols = (uint16_t)argc;
    2852              :     
    2853        49096 :     size_t breq = pk_encode_size((dbvalue_t **)argv, argc, 0, data->skip_decode_idx);
    2854        49096 :     if (cloudsync_payload_encode_check(payload, breq) == false) {
    2855            0 :         return cloudsync_set_error(data, "Not enough memory to resize payload internal buffer", DBRES_NOMEM);
    2856              :     }
    2857              :     
    2858        49096 :     char *buffer = payload->buffer + payload->bused;
    2859        49096 :     size_t bsize = payload->balloc - payload->bused;
    2860        49096 :     char *p = pk_encode((dbvalue_t **)argv, argc, buffer, false, &bsize, data->skip_decode_idx);
    2861        49096 :     if (!p) return cloudsync_set_error(data, "An error occurred while encoding payload", DBRES_ERROR);
    2862              :     
    2863              :     // update buffer
    2864        49096 :     payload->bused += breq;
    2865              :     
    2866              :     // increment row counter
    2867        49096 :     ++payload->nrows;
    2868              :     
    2869        49096 :     return DBRES_OK;
    2870        49096 : }
    2871              : 
    2872          657 : int cloudsync_payload_encode_final (cloudsync_payload_context *payload, cloudsync_context *data) {
    2873              :     DEBUG_FUNCTION("cloudsync_payload_encode_final");
    2874              :     
    2875          657 :     if (payload->nrows == 0) {
    2876            8 :         if (payload->buffer) cloudsync_memory_free(payload->buffer);
    2877            8 :         payload->buffer = NULL;
    2878            8 :         payload->bsize = 0;
    2879            8 :         return DBRES_OK;
    2880              :     }
    2881              :     
    2882          649 :     if (payload->nrows > UINT32_MAX) {
    2883            0 :         if (payload->buffer) cloudsync_memory_free(payload->buffer);
    2884            0 :         payload->buffer = NULL;
    2885            0 :         payload->bsize = 0;
    2886            0 :         cloudsync_set_error(data, "Maximum number of payload rows reached", DBRES_ERROR);
    2887            0 :         return DBRES_ERROR;
    2888              :     }
    2889              :     
    2890              :     // sanity check about buffer size
    2891          649 :     int header_size = (int)sizeof(cloudsync_payload_header);
    2892          649 :     int64_t buffer_size = (int64_t)payload->bused - (int64_t)header_size;
    2893          649 :     if (buffer_size < 0) {
    2894            0 :         if (payload->buffer) cloudsync_memory_free(payload->buffer);
    2895            0 :         payload->buffer = NULL;
    2896            0 :         payload->bsize = 0;
    2897            0 :         cloudsync_set_error(data, "cloudsync_encode: internal size underflow", DBRES_ERROR);
    2898            0 :         return DBRES_ERROR;
    2899              :     }
    2900          649 :     if (buffer_size > INT_MAX) {
    2901            0 :         if (payload->buffer) cloudsync_memory_free(payload->buffer);
    2902            0 :         payload->buffer = NULL;
    2903            0 :         payload->bsize = 0;
    2904            0 :         cloudsync_set_error(data, "cloudsync_encode: payload too large to compress (INT_MAX limit)", DBRES_ERROR);
    2905            0 :         return DBRES_ERROR;
    2906              :     }
    2907              :     // try to allocate buffer used for compressed data
    2908          649 :     int real_buffer_size = (int)buffer_size;
    2909          649 :     int zbound = LZ4_compressBound(real_buffer_size);
    2910          649 :     char *zbuffer = cloudsync_memory_alloc(zbound + header_size); // if for some reasons allocation fails then just skip compression
    2911              :     
    2912              :     // skip the reserved header from the buffer to compress
    2913          649 :     char *src_buffer = payload->buffer + sizeof(cloudsync_payload_header);
    2914          649 :     int zused = (zbuffer) ? LZ4_compress_default(src_buffer, zbuffer+header_size, real_buffer_size, zbound) : 0;
    2915          649 :     bool use_uncompressed_buffer = (!zused || zused > real_buffer_size);
    2916          649 :     CHECK_FORCE_UNCOMPRESSED_BUFFER();
    2917              :     
    2918              :     // setup payload header
    2919          649 :     cloudsync_payload_header header = {0};
    2920          649 :     uint32_t expanded_size = (use_uncompressed_buffer) ? 0 : real_buffer_size;
    2921          649 :     cloudsync_payload_header_init(&header, expanded_size, payload->ncols, (uint32_t)payload->nrows, data->schema_hash);
    2922              :     
    2923              :     // if compression fails or if compressed size is bigger than original buffer, then use the uncompressed buffer
    2924          649 :     if (use_uncompressed_buffer) {
    2925            2 :         if (zbuffer) cloudsync_memory_free(zbuffer);
    2926            2 :         zbuffer = payload->buffer;
    2927            2 :         zused = real_buffer_size;
    2928            2 :     }
    2929              :     
    2930              :     // compute checksum of the buffer
    2931          649 :     uint64_t checksum = pk_checksum(zbuffer + header_size, zused);
    2932          649 :     cloudsync_payload_checksum_store(&header, checksum);
    2933              :     
    2934              :     // copy header and data to SQLite BLOB
    2935          649 :     memcpy(zbuffer, &header, sizeof(cloudsync_payload_header));
    2936          649 :     int blob_size = zused + sizeof(cloudsync_payload_header);
    2937          649 :     payload->bsize = blob_size;
    2938              :     
    2939              :     // cleanup memory
    2940          649 :     if (zbuffer != payload->buffer) {
    2941          647 :         cloudsync_memory_free (payload->buffer);
    2942          647 :         payload->buffer = zbuffer;
    2943          647 :     }
    2944              :     
    2945          649 :     return DBRES_OK;
    2946          657 : }
    2947              : 
    2948          657 : char *cloudsync_payload_blob (cloudsync_payload_context *payload, int64_t *blob_size, int64_t *nrows) {
    2949              :     DEBUG_FUNCTION("cloudsync_payload_blob");
    2950              :     
    2951          657 :     if (blob_size) *blob_size = (int64_t)payload->bsize;
    2952          657 :     if (nrows) *nrows = (int64_t)payload->nrows;
    2953          657 :     return payload->buffer;
    2954              : }
    2955              : 
    2956       441144 : static int cloudsync_payload_decode_callback (void *xdata, int index, int type, int64_t ival, double dval, char *pval) {
    2957       441144 :     cloudsync_pk_decode_bind_context *decode_context = (cloudsync_pk_decode_bind_context*)xdata;
    2958       441144 :     int rc = pk_decode_bind_callback(decode_context->vm, index, type, ival, dval, pval);
    2959              :     
    2960       441144 :     if (rc == DBRES_OK) {
    2961              :         // the dbversion index is smaller than seq index, so it is processed first
    2962              :         // when processing the dbversion column: save the value to the tmp_dbversion field
    2963              :         // when processing the seq column: update the dbversion and seq fields only if the current dbversion is greater than the last max value
    2964       441144 :         switch (index) {
    2965              :             case CLOUDSYNC_PK_INDEX_TBL:
    2966        49016 :                 if (type == DBTYPE_TEXT) {
    2967        49016 :                     decode_context->tbl = pval;
    2968        49016 :                     decode_context->tbl_len = ival;
    2969        49016 :                 }
    2970        49016 :                 break;
    2971              :             case CLOUDSYNC_PK_INDEX_PK:
    2972        49016 :                 if (type == DBTYPE_BLOB) {
    2973        49016 :                     decode_context->pk = pval;
    2974        49016 :                     decode_context->pk_len = ival;
    2975        49016 :                 }
    2976        49016 :                 break;
    2977              :             case CLOUDSYNC_PK_INDEX_COLNAME:
    2978        49016 :                 if (type == DBTYPE_TEXT) {
    2979        49016 :                     decode_context->col_name = pval;
    2980        49016 :                     decode_context->col_name_len = ival;
    2981        49016 :                 }
    2982        49016 :                 break;
    2983              :             case CLOUDSYNC_PK_INDEX_COLVERSION:
    2984        49016 :                 if (type == DBTYPE_INTEGER) decode_context->col_version = ival;
    2985        49016 :                 break;
    2986              :             case CLOUDSYNC_PK_INDEX_DBVERSION:
    2987        49016 :                 if (type == DBTYPE_INTEGER) decode_context->db_version = ival;
    2988        49016 :                 break;
    2989              :             case CLOUDSYNC_PK_INDEX_SITEID:
    2990        49016 :                 if (type == DBTYPE_BLOB) {
    2991        49016 :                     decode_context->site_id = pval;
    2992        49016 :                     decode_context->site_id_len = ival;
    2993        49016 :                 }
    2994        49016 :                 break;
    2995              :             case CLOUDSYNC_PK_INDEX_CL:
    2996        49016 :                 if (type == DBTYPE_INTEGER) decode_context->cl = ival;
    2997        49016 :                 break;
    2998              :             case CLOUDSYNC_PK_INDEX_SEQ:
    2999        49016 :                 if (type == DBTYPE_INTEGER) decode_context->seq = ival;
    3000        49016 :                 break;
    3001              :         }
    3002       441144 :     }
    3003              :         
    3004       441144 :     return rc;
    3005              : }
    3006              : 
    3007              : // #ifndef CLOUDSYNC_OMIT_RLS_VALIDATION
    3008              : 
    3009          645 : int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int blen, int *pnrows) {
    3010              :     // sanity check
    3011          645 :     if (blen < (int)sizeof(cloudsync_payload_header)) return cloudsync_set_error(data, "Error on cloudsync_payload_apply: invalid payload length", DBRES_MISUSE);
    3012              :     
    3013              :     // decode header
    3014              :     cloudsync_payload_header header;
    3015          645 :     memcpy(&header, payload, sizeof(cloudsync_payload_header));
    3016              :     
    3017          645 :     header.signature = ntohl(header.signature);
    3018          645 :     header.expanded_size = ntohl(header.expanded_size);
    3019          645 :     header.ncols = ntohs(header.ncols);
    3020          645 :     header.nrows = ntohl(header.nrows);
    3021          645 :     header.schema_hash = ntohll(header.schema_hash);
    3022              :     
    3023              :     // compare schema_hash only if not disabled and if the received payload was created with the current header version
    3024              :     // to avoid schema hash mismatch when processed by a peer with a different extension version during software updates.
    3025          645 :     if (dbutils_settings_get_int64_value(data, CLOUDSYNC_KEY_SKIP_SCHEMA_HASH_CHECK) == 0 && header.version == CLOUDSYNC_PAYLOAD_VERSION_LATEST ) {
    3026          645 :         if (header.schema_hash != data->schema_hash) {
    3027            3 :             if (!database_check_schema_hash(data, header.schema_hash)) {
    3028              :                 char buffer[1024];
    3029            2 :                 snprintf(buffer, sizeof(buffer), "Cannot apply the received payload because the schema hash is unknown %llu.", header.schema_hash);
    3030            2 :                 return cloudsync_set_error(data, buffer, DBRES_MISUSE);
    3031              :             }
    3032            1 :         }
    3033          643 :     }
    3034              :     
    3035              :     // sanity check header
    3036          643 :     if ((header.signature != CLOUDSYNC_PAYLOAD_SIGNATURE) || (header.ncols == 0)) {
    3037            0 :         return cloudsync_set_error(data, "Error on cloudsync_payload_apply: invalid signature or column size", DBRES_MISUSE);
    3038              :     }
    3039              :     
    3040          643 :     const char *buffer = payload + sizeof(cloudsync_payload_header);
    3041          643 :     size_t buf_len = (size_t)blen - sizeof(cloudsync_payload_header);
    3042              : 
    3043              :     // sanity check checksum (only if version is >= 2)
    3044          643 :     if (header.version >= CLOUDSYNC_PAYLOAD_MIN_VERSION_WITH_CHECKSUM) {
    3045          643 :         uint64_t checksum = pk_checksum(buffer, buf_len);
    3046          643 :         if (cloudsync_payload_checksum_verify(&header, checksum) == false) {
    3047            0 :             return cloudsync_set_error(data, "Error on cloudsync_payload_apply: invalid checksum", DBRES_MISUSE);
    3048              :         }
    3049          643 :     }
    3050              : 
    3051              :     // check if payload is compressed
    3052          643 :     char *clone = NULL;
    3053          643 :     if (header.expanded_size != 0) {
    3054          641 :         clone = (char *)cloudsync_memory_alloc(header.expanded_size);
    3055          641 :         if (!clone) return cloudsync_set_error(data, "Unable to allocate memory to uncompress payload", DBRES_NOMEM);
    3056              : 
    3057          641 :         int lz4_rc = LZ4_decompress_safe(buffer, clone, (int)buf_len, (int)header.expanded_size);
    3058          641 :         if (lz4_rc <= 0 || (uint32_t)lz4_rc != header.expanded_size) {
    3059            0 :             if (clone) cloudsync_memory_free(clone);
    3060            0 :             return cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to decompress BLOB", DBRES_MISUSE);
    3061              :         }
    3062              : 
    3063          641 :         buffer = (const char *)clone;
    3064          641 :         buf_len = (size_t)header.expanded_size;
    3065          641 :     }
    3066              :     
    3067              :     // precompile the insert statement
    3068          643 :     dbvm_t *vm = NULL;
    3069          643 :     int rc = databasevm_prepare(data, SQL_CHANGES_INSERT_ROW, &vm, 0);
    3070          643 :     if (rc != DBRES_OK) {
    3071            0 :         if (clone) cloudsync_memory_free(clone);
    3072            0 :         return cloudsync_set_error(data, "Error on cloudsync_payload_apply: error while compiling SQL statement", rc);
    3073              :     }
    3074              :     
    3075              :     // process buffer, one row at a time
    3076          643 :     uint16_t ncols = header.ncols;
    3077          643 :     uint32_t nrows = header.nrows;
    3078          643 :     int64_t last_payload_db_version = -1;
    3079          643 :     int dbversion = dbutils_settings_get_int_value(data, CLOUDSYNC_KEY_CHECK_DBVERSION);
    3080          643 :     int seq = dbutils_settings_get_int_value(data, CLOUDSYNC_KEY_CHECK_SEQ);
    3081          643 :     cloudsync_pk_decode_bind_context decoded_context = {.vm = vm};
    3082              : 
    3083              :     // Initialize deferred column-batch merge
    3084          643 :     merge_pending_batch batch = {0};
    3085          643 :     data->pending_batch = &batch;
    3086          643 :     bool in_savepoint = false;
    3087          643 :     const void *last_pk = NULL;
    3088          643 :     int64_t last_pk_len = 0;
    3089          643 :     const char *last_tbl = NULL;
    3090          643 :     int64_t last_tbl_len = 0;
    3091              : 
    3092        49659 :     for (uint32_t i=0; i<nrows; ++i) {
    3093        49016 :         size_t seek = 0;
    3094        49016 :         int res = pk_decode((char *)buffer, buf_len, ncols, &seek, data->skip_decode_idx, cloudsync_payload_decode_callback, &decoded_context);
    3095        49016 :         if (res == -1) {
    3096            0 :             merge_flush_pending(data);
    3097            0 :             data->pending_batch = NULL;
    3098            0 :             if (batch.cached_vm) { databasevm_finalize(batch.cached_vm); batch.cached_vm = NULL; }
    3099            0 :             if (batch.cached_col_names) { cloudsync_memory_free(batch.cached_col_names); batch.cached_col_names = NULL; }
    3100            0 :             if (batch.entries) { cloudsync_memory_free(batch.entries); batch.entries = NULL; }
    3101            0 :             if (in_savepoint) database_rollback_savepoint(data, "cloudsync_payload_apply");
    3102            0 :             rc = DBRES_ERROR;
    3103            0 :             goto cleanup;
    3104              :         }
    3105              : 
    3106              :         // Detect PK/table/db_version boundary to flush pending batch
    3107        97389 :         bool pk_changed = (last_pk != NULL &&
    3108        48373 :                            (last_pk_len != decoded_context.pk_len ||
    3109        46452 :                             memcmp(last_pk, decoded_context.pk, last_pk_len) != 0));
    3110        97389 :         bool tbl_changed = (last_tbl != NULL &&
    3111        48373 :                             (last_tbl_len != decoded_context.tbl_len ||
    3112        48303 :                              memcmp(last_tbl, decoded_context.tbl, last_tbl_len) != 0));
    3113        49016 :         bool db_version_changed = (last_payload_db_version != decoded_context.db_version);
    3114              : 
    3115              :         // Flush pending batch before any boundary change
    3116        49016 :         if (pk_changed || tbl_changed || db_version_changed) {
    3117        18158 :             int flush_rc = merge_flush_pending(data);
    3118        18158 :             if (flush_rc != DBRES_OK) {
    3119            1 :                 rc = flush_rc;
    3120              :                 // continue processing remaining rows
    3121            1 :             }
    3122        18158 :         }
    3123              : 
    3124              :         // Per-db_version savepoints group rows with the same source db_version
    3125              :         // into one transaction. In SQLite autocommit mode, the RELEASE triggers
    3126              :         // the commit hook which bumps data->db_version and resets seq, ensuring
    3127              :         // unique (db_version, seq) tuples across groups. In PostgreSQL SPI,
    3128              :         // database_in_transaction() is always true so this block is inactive —
    3129              :         // the inner per-PK savepoint in merge_flush_pending handles RLS instead.
    3130        49016 :         if (in_savepoint && db_version_changed) {
    3131         4255 :             rc = database_commit_savepoint(data, "cloudsync_payload_apply");
    3132         4255 :             if (rc != DBRES_OK) {
    3133            0 :                 merge_pending_free_entries(&batch);
    3134            0 :                 data->pending_batch = NULL;
    3135            0 :                 cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to release a savepoint", rc);
    3136            0 :                 goto cleanup;
    3137              :             }
    3138         4255 :             in_savepoint = false;
    3139         4255 :         }
    3140              : 
    3141        49016 :         if (!in_savepoint && db_version_changed && !database_in_transaction(data)) {
    3142         4898 :             rc = database_begin_savepoint(data, "cloudsync_payload_apply");
    3143         4898 :             if (rc != DBRES_OK) {
    3144            0 :                 merge_pending_free_entries(&batch);
    3145            0 :                 data->pending_batch = NULL;
    3146            0 :                 cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to start a transaction", rc);
    3147            0 :                 goto cleanup;
    3148              :             }
    3149         4898 :             in_savepoint = true;
    3150         4898 :         }
    3151              : 
    3152              :         // Track db_version for batch-flush boundary detection
    3153        49016 :         if (db_version_changed) {
    3154         4898 :             last_payload_db_version = decoded_context.db_version;
    3155         4898 :         }
    3156              : 
    3157              :         // Update PK/table tracking
    3158        49016 :         last_pk = decoded_context.pk;
    3159        49016 :         last_pk_len = decoded_context.pk_len;
    3160        49016 :         last_tbl = decoded_context.tbl;
    3161        49016 :         last_tbl_len = decoded_context.tbl_len;
    3162              : 
    3163        49016 :         rc = databasevm_step(vm);
    3164        49016 :         if (rc != DBRES_DONE) {
    3165              :             // don't "break;", the error can be due to a RLS policy.
    3166              :             // in case of error we try to apply the following changes
    3167            0 :         }
    3168              : 
    3169        49016 :         buffer += seek;
    3170        49016 :         buf_len -= seek;
    3171        49016 :         dbvm_reset(vm);
    3172        49016 :     }
    3173              : 
    3174              :     // Final flush after loop
    3175              :     {
    3176          643 :         int flush_rc = merge_flush_pending(data);
    3177          643 :         if (flush_rc != DBRES_OK && rc == DBRES_OK) rc = flush_rc;
    3178              :     }
    3179          643 :     data->pending_batch = NULL;
    3180              : 
    3181          643 :     if (in_savepoint) {
    3182          643 :         int rc1 = database_commit_savepoint(data, "cloudsync_payload_apply");
    3183          643 :         if (rc1 != DBRES_OK) rc = rc1;
    3184          643 :     }
    3185              : 
    3186              :     // save last error (unused if function returns OK)
    3187          643 :     if (rc != DBRES_OK && rc != DBRES_DONE) {
    3188            0 :         cloudsync_set_dberror(data);
    3189            0 :     }
    3190              : 
    3191          643 :     if (rc == DBRES_DONE) rc = DBRES_OK;
    3192         1286 :     if (rc == DBRES_OK) {
    3193              :         char buf[256];
    3194          643 :         if (decoded_context.db_version >= dbversion) {
    3195          510 :             snprintf(buf, sizeof(buf), "%" PRId64, decoded_context.db_version);
    3196          510 :             dbutils_settings_set_key_value(data, CLOUDSYNC_KEY_CHECK_DBVERSION, buf);
    3197              :             
    3198          510 :             if (decoded_context.seq != seq) {
    3199          327 :                 snprintf(buf, sizeof(buf), "%" PRId64, decoded_context.seq);
    3200          327 :                 dbutils_settings_set_key_value(data, CLOUDSYNC_KEY_CHECK_SEQ, buf);
    3201          327 :             }
    3202          510 :         }
    3203          643 :     }
    3204              : 
    3205              : cleanup:
    3206              :     // cleanup merge_pending_batch
    3207          643 :     if (batch.cached_vm) { databasevm_finalize(batch.cached_vm); batch.cached_vm = NULL; }
    3208          643 :     if (batch.cached_col_names) { cloudsync_memory_free(batch.cached_col_names); batch.cached_col_names = NULL; }
    3209          643 :     if (batch.entries) { cloudsync_memory_free(batch.entries); batch.entries = NULL; }  
    3210              : 
    3211              :     // cleanup vm
    3212          643 :     if (vm) databasevm_finalize(vm);
    3213              : 
    3214              :     // cleanup memory
    3215          643 :     if (clone) cloudsync_memory_free(clone);
    3216              : 
    3217              :     // error already saved in (save last error)
    3218          643 :     if (rc != DBRES_OK) return rc;
    3219              : 
    3220              :     // return the number of processed rows
    3221          643 :     if (pnrows) *pnrows = nrows;
    3222          643 :     return DBRES_OK;
    3223          645 : }
    3224              : 
    3225              : // MARK: - Payload load/store -
    3226              : 
    3227            0 : int cloudsync_payload_get (cloudsync_context *data, char **blob, int *blob_size, int *db_version, int64_t *new_db_version) {
    3228              :     // retrieve current db_version and seq
    3229            0 :     *db_version = dbutils_settings_get_int_value(data, CLOUDSYNC_KEY_SEND_DBVERSION);
    3230            0 :     if (*db_version < 0) return DBRES_ERROR;
    3231              :     
    3232              :     // retrieve BLOB
    3233              :     char sql[1024];
    3234            0 :     snprintf(sql, sizeof(sql), "WITH max_db_version AS (SELECT MAX(db_version) AS max_db_version FROM cloudsync_changes WHERE site_id=cloudsync_siteid()) "
    3235              :                                "SELECT * FROM (SELECT cloudsync_payload_encode(tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq) AS payload, max_db_version AS max_db_version FROM cloudsync_changes, max_db_version WHERE site_id=cloudsync_siteid() AND db_version>%d) WHERE payload IS NOT NULL", *db_version);
    3236              :     
    3237            0 :     int64_t len = 0;
    3238            0 :     int rc = database_select_blob_int(data, sql, blob, &len, new_db_version);
    3239            0 :     *blob_size = (int)len;
    3240            0 :     if (rc != DBRES_OK) return rc;
    3241              :     
    3242              :     // exit if there is no data to send
    3243            0 :     if (*blob == NULL || *blob_size == 0) return DBRES_OK;
    3244            0 :     return rc;
    3245            0 : }
    3246              : 
    3247              : #ifdef CLOUDSYNC_DESKTOP_OS
    3248            0 : int cloudsync_payload_save (cloudsync_context *data, const char *payload_path, int *size) {
    3249              :     DEBUG_FUNCTION("cloudsync_payload_save");
    3250              :     
    3251              :     // silently delete any other payload with the same name
    3252            0 :     cloudsync_file_delete(payload_path);
    3253              :     
    3254              :     // retrieve payload
    3255            0 :     char *blob = NULL;
    3256            0 :     int blob_size = 0, db_version = 0;
    3257            0 :     int64_t new_db_version = 0;
    3258            0 :     int rc = cloudsync_payload_get(data, &blob, &blob_size, &db_version, &new_db_version);
    3259            0 :     if (rc != DBRES_OK) {
    3260            0 :         if (db_version < 0) return cloudsync_set_error(data, "Unable to retrieve db_version", rc);
    3261            0 :         return cloudsync_set_error(data, "Unable to retrieve changes in cloudsync_payload_save", rc);
    3262              :     }
    3263              :     
    3264              :     // exit if there is no data to save
    3265            0 :     if (blob == NULL || blob_size == 0) {
    3266            0 :         if (size) *size = 0;
    3267            0 :         return DBRES_OK;
    3268              :     }
    3269              :     
    3270              :     // write payload to file
    3271            0 :     bool res = cloudsync_file_write(payload_path, blob, (size_t)blob_size);
    3272            0 :     cloudsync_memory_free(blob);
    3273            0 :     if (res == false) {
    3274            0 :         return cloudsync_set_error(data, "Unable to write payload to file path", DBRES_IOERR);
    3275              :     }
    3276              :     
    3277              :     // returns blob size
    3278            0 :     if (size) *size = blob_size;
    3279            0 :     return DBRES_OK;
    3280            0 : }
    3281              : #endif
    3282              : 
    3283              : // MARK: - Core -
    3284              : 
    3285          251 : int cloudsync_table_sanity_check (cloudsync_context *data, const char *name, bool skip_int_pk_check) {
    3286              :     DEBUG_DBFUNCTION("cloudsync_table_sanity_check %s", name);
    3287              :     char buffer[2048];
    3288              :     
    3289              :     // sanity check table name
    3290          251 :     if (name == NULL) {
    3291            1 :         return cloudsync_set_error(data, "cloudsync_init requires a non-null table parameter", DBRES_ERROR);
    3292              :     }
    3293              :     
    3294              :     // avoid allocating heap memory for SQL statements by setting a maximum length of 512 characters
    3295              :     // for table names. This limit is reasonable and helps prevent memory management issues.
    3296          250 :     const size_t maxlen = CLOUDSYNC_MAX_TABLENAME_LEN;
    3297          250 :     if (strlen(name) > maxlen) {
    3298            1 :         snprintf(buffer, sizeof(buffer), "Table name cannot be longer than %d characters", (int)maxlen);
    3299            1 :         return cloudsync_set_error(data, buffer, DBRES_ERROR);
    3300              :     }
    3301              :     
    3302              :     // check if already initialized
    3303          249 :     cloudsync_table_context *table = table_lookup(data, name);
    3304          249 :     if (table) return DBRES_OK;
    3305              :     
    3306              :     // check if table exists
    3307          246 :     if (database_table_exists(data, name, cloudsync_schema(data)) == false) {
    3308            2 :         snprintf(buffer, sizeof(buffer), "Table %s does not exist", name);
    3309            2 :         return cloudsync_set_error(data, buffer, DBRES_ERROR);
    3310              :     }
    3311              :     
    3312              :     // no more than 128 columns can be used as a composite primary key (SQLite hard limit)
    3313          244 :     int npri_keys = database_count_pk(data, name, false, cloudsync_schema(data));
    3314          244 :     if (npri_keys < 0) return cloudsync_set_dberror(data);
    3315          244 :     if (npri_keys > 128) return cloudsync_set_error(data, "No more than 128 columns can be used to form a composite primary key", DBRES_ERROR);
    3316              :     
    3317              :     #if CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
    3318              :     // if count == 0 means that rowid will be used as primary key (BTW: very bad choice for the user)
    3319          244 :     if (npri_keys == 0) {
    3320            1 :         snprintf(buffer, sizeof(buffer), "Rowid only tables are not supported, all primary keys must be explicitly set and declared as NOT NULL (table %s)", name);
    3321            1 :         return cloudsync_set_error(data, buffer, DBRES_ERROR);
    3322              :     }
    3323              :     #endif
    3324              :         
    3325          243 :     if (!skip_int_pk_check) {
    3326          182 :         if (npri_keys == 1) {
    3327              :             // the affinity of a column is determined by the declared type of the column,
    3328              :             // according to the following rules in the order shown:
    3329              :             // 1. If the declared type contains the string "INT" then it is assigned INTEGER affinity.
    3330          100 :             int npri_keys_int = database_count_int_pk(data, name, cloudsync_schema(data));
    3331          100 :             if (npri_keys_int < 0) return cloudsync_set_dberror(data);
    3332          100 :             if (npri_keys == npri_keys_int) {
    3333            1 :                 snprintf(buffer, sizeof(buffer), "Table %s uses a single-column INTEGER primary key. For CRDT replication, primary keys must be globally unique. Consider using a TEXT primary key with UUIDs or ULID to avoid conflicts across nodes. If you understand the risk and still want to use this INTEGER primary key, set the third argument of the cloudsync_init function to 1 to skip this check.", name);
    3334            1 :                 return cloudsync_set_error(data, buffer, DBRES_ERROR);
    3335              :             }
    3336              :             
    3337           99 :         }
    3338          181 :     }
    3339              :         
    3340              :     // if user declared explicit primary key(s) then make sure they are all declared as NOT NULL
    3341              :     #if CLOUDSYNC_CHECK_NOTNULL_PRIKEYS
    3342              :     if (npri_keys > 0) {
    3343              :         int npri_keys_notnull = database_count_pk(data, name, true, cloudsync_schema(data));
    3344              :         if (npri_keys_notnull < 0) return cloudsync_set_dberror(data);
    3345              :         if (npri_keys != npri_keys_notnull) {
    3346              :             snprintf(buffer, sizeof(buffer), "All primary keys must be explicitly declared as NOT NULL (table %s)", name);
    3347              :             return cloudsync_set_error(data, buffer, DBRES_ERROR);
    3348              :         }
    3349              :     }
    3350              :     #endif
    3351              :     
    3352              :     // check for columns declared as NOT NULL without a DEFAULT value.
    3353              :     // Otherwise, col_merge_stmt would fail if changes to other columns are inserted first.
    3354          242 :     int n_notnull_nodefault = database_count_notnull_without_default(data, name, cloudsync_schema(data));
    3355          242 :     if (n_notnull_nodefault < 0) return cloudsync_set_dberror(data);
    3356          242 :     if (n_notnull_nodefault > 0) {
    3357            0 :         snprintf(buffer, sizeof(buffer), "All non-primary key columns declared as NOT NULL must have a DEFAULT value. (table %s)", name);
    3358            0 :         return cloudsync_set_error(data, buffer, DBRES_ERROR);
    3359              :     }
    3360              :     
    3361          242 :     return DBRES_OK;
    3362          251 : }
    3363              : 
    3364            3 : int cloudsync_cleanup_internal (cloudsync_context *data, cloudsync_table_context *table) {
    3365            3 :     if (cloudsync_context_init(data) == NULL) return DBRES_MISUSE;
    3366              :     
    3367              :     // drop meta-table
    3368            3 :     const char *table_name = table->name;
    3369            3 :     char *sql = cloudsync_memory_mprintf(SQL_DROP_CLOUDSYNC_TABLE, table->meta_ref);
    3370            3 :     int rc = database_exec(data, sql);
    3371            3 :     cloudsync_memory_free(sql);
    3372            3 :     if (rc != DBRES_OK) {
    3373              :         char buffer[1024];
    3374            0 :         snprintf(buffer, sizeof(buffer), "Unable to drop cloudsync table %s_cloudsync in cloudsync_cleanup", table_name);
    3375            0 :         return cloudsync_set_error(data, buffer, rc);
    3376              :     }
    3377              :     
    3378              :     // drop original triggers
    3379            3 :     rc = database_delete_triggers(data, table_name);
    3380            3 :     if (rc != DBRES_OK) {
    3381              :         char buffer[1024];
    3382            0 :         snprintf(buffer, sizeof(buffer), "Unable to delete triggers for table %s", table_name);
    3383            0 :         return cloudsync_set_error(data, buffer, rc);
    3384              :     }
    3385              :     
    3386              :     // remove all table related settings
    3387            3 :     dbutils_table_settings_set_key_value(data, table_name, NULL, NULL, NULL);
    3388            3 :     return DBRES_OK;
    3389            3 : }
    3390              : 
    3391            3 : int cloudsync_cleanup (cloudsync_context *data, const char *table_name) {
    3392            3 :     cloudsync_table_context *table = table_lookup(data, table_name);
    3393            3 :     if (!table) return DBRES_OK;
    3394              :     
    3395              :     // TODO: check what happen if cloudsync_cleanup_internal failes (not eveything dropped) and the table is still in memory?
    3396              :     
    3397            3 :     int rc = cloudsync_cleanup_internal(data, table);
    3398            3 :     if (rc != DBRES_OK) return rc;
    3399              :     
    3400            3 :     int counter = table_remove(data, table);
    3401            3 :     table_free(table);
    3402              :     
    3403            3 :     if (counter == 0) {
    3404              :         // cleanup database on last table
    3405            1 :         cloudsync_reset_siteid(data);
    3406            1 :         dbutils_settings_cleanup(data);
    3407            1 :     } else {
    3408            2 :         if (database_internal_table_exists(data, CLOUDSYNC_TABLE_SETTINGS_NAME) == true) {
    3409            2 :             cloudsync_update_schema_hash(data);
    3410            2 :         }
    3411              :     }
    3412              :     
    3413            3 :     return DBRES_OK;
    3414            3 : }
    3415              : 
    3416            0 : int cloudsync_cleanup_all (cloudsync_context *data) {
    3417            0 :     return database_cleanup(data);
    3418              : }
    3419              : 
    3420          402 : int cloudsync_terminate (cloudsync_context *data) {
    3421              :     // can't use for/loop here because data->tables_count is changed by table_remove
    3422          616 :     while (data->tables_count > 0) {
    3423          214 :         cloudsync_table_context *t = data->tables[data->tables_count - 1];
    3424          214 :         table_remove(data, t);
    3425          214 :         table_free(t);
    3426              :     }
    3427              :     
    3428          402 :     if (data->schema_version_stmt) databasevm_finalize(data->schema_version_stmt);
    3429          402 :     if (data->data_version_stmt) databasevm_finalize(data->data_version_stmt);
    3430          402 :     if (data->db_version_stmt) databasevm_finalize(data->db_version_stmt);
    3431          402 :     if (data->getset_siteid_stmt) databasevm_finalize(data->getset_siteid_stmt);
    3432          402 :     if (data->current_schema) cloudsync_memory_free(data->current_schema);
    3433              :     
    3434          402 :     data->schema_version_stmt = NULL;
    3435          402 :     data->data_version_stmt = NULL;
    3436          402 :     data->db_version_stmt = NULL;
    3437          402 :     data->getset_siteid_stmt = NULL;
    3438          402 :     data->current_schema = NULL;
    3439              :     
    3440              :     // reset the site_id so the cloudsync_context_init will be executed again
    3441              :     // if any other cloudsync function is called after terminate
    3442          402 :     data->site_id[0] = 0;
    3443              :     
    3444          402 :     return 1;
    3445              : }
    3446              : 
    3447          246 : int cloudsync_init_table (cloudsync_context *data, const char *table_name, const char *algo_name, bool skip_int_pk_check) {
    3448              :     // sanity check table and its primary key(s)
    3449          246 :     int rc = cloudsync_table_sanity_check(data, table_name, skip_int_pk_check);
    3450          246 :     if (rc != DBRES_OK) return rc;
    3451              :     
    3452              :     // init cloudsync_settings
    3453          244 :     if (cloudsync_context_init(data) == NULL) {
    3454            0 :         return cloudsync_set_error(data, "Unable to initialize cloudsync context", DBRES_MISUSE);
    3455              :     }
    3456              :     
    3457              :     // sanity check algo name (if exists)
    3458          244 :     table_algo algo_new = table_algo_none;
    3459          244 :     if (!algo_name) algo_name = CLOUDSYNC_DEFAULT_ALGO;
    3460              :     
    3461          244 :     algo_new = cloudsync_algo_from_name(algo_name);
    3462          244 :     if (algo_new == table_algo_none) {
    3463              :         char buffer[1024];
    3464            1 :         snprintf(buffer, sizeof(buffer), "Unknown CRDT algorithm name %s", algo_name);
    3465            1 :         return cloudsync_set_error(data, buffer, DBRES_ERROR);
    3466              :     }
    3467              : 
    3468              :     // DWS and AWS algorithms are not yet implemented in the merge logic
    3469          243 :     if (algo_new == table_algo_crdt_dws || algo_new == table_algo_crdt_aws) {
    3470              :         char buffer[1024];
    3471            0 :         snprintf(buffer, sizeof(buffer), "CRDT algorithm %s is not yet supported", algo_name);
    3472            0 :         return cloudsync_set_error(data, buffer, DBRES_ERROR);
    3473              :     }
    3474              :     
    3475              :     // check if table name was already augmented
    3476          243 :     table_algo algo_current = dbutils_table_settings_get_algo(data, table_name);
    3477              :     
    3478              :     // sanity check algorithm
    3479          243 :     if ((algo_new == algo_current) && (algo_current != table_algo_none)) {
    3480              :         // if table algorithms and the same and not none, do nothing
    3481          243 :     } else if ((algo_new == table_algo_none) && (algo_current == table_algo_none)) {
    3482              :         // nothing is written into settings because the default table_algo_crdt_cls will be used
    3483            0 :         algo_new = algo_current = table_algo_crdt_cls;
    3484          216 :     } else if ((algo_new == table_algo_none) && (algo_current != table_algo_none)) {
    3485              :         // algo is already written into settins so just use it
    3486            0 :         algo_new = algo_current;
    3487          216 :     } else if ((algo_new != table_algo_none) && (algo_current == table_algo_none)) {
    3488              :         // write table algo name in settings
    3489          216 :         dbutils_table_settings_set_key_value(data, table_name, "*", "algo", algo_name);
    3490          216 :     } else {
    3491              :         // error condition
    3492            0 :         return cloudsync_set_error(data, "The function cloudsync_cleanup(table) must be called before changing a table algorithm", DBRES_MISUSE);
    3493              :     }
    3494              :     
    3495              :     // Run the following function even if table was already augmented.
    3496              :     // It is safe to call the following function multiple times, if there is nothing to update nothing will be changed.
    3497              :     // After an alter table, in contrast, all the cloudsync triggers, tables and stmts must be recreated.
    3498              :     
    3499              :     // sync algo with table (unused in this version)
    3500              :     // cloudsync_sync_table_key(data, table_name, "*", CLOUDSYNC_KEY_ALGO, crdt_algo_name(algo_new));
    3501              :     
    3502              :     // read row-level filter from settings (if any)
    3503              :     char init_filter_buf[2048];
    3504          243 :     int init_frc = dbutils_table_settings_get_value(data, table_name, "*", "filter", init_filter_buf, sizeof(init_filter_buf));
    3505          243 :     const char *init_filter = (init_frc == DBRES_OK && init_filter_buf[0]) ? init_filter_buf : NULL;
    3506              : 
    3507              :     // check triggers
    3508          243 :     rc = database_create_triggers(data, table_name, algo_new, init_filter);
    3509          243 :     if (rc != DBRES_OK) return cloudsync_set_error(data, "An error occurred while creating triggers", DBRES_MISUSE);
    3510              :     
    3511              :     // check meta-table
    3512          243 :     rc = database_create_metatable(data, table_name);
    3513          243 :     if (rc != DBRES_OK) return cloudsync_set_error(data, "An error occurred while creating metatable", DBRES_MISUSE);
    3514              :     
    3515              :     // add prepared statements
    3516          243 :     if (cloudsync_add_dbvms(data) != DBRES_OK) {
    3517            0 :         return cloudsync_set_error(data, "An error occurred while trying to compile prepared SQL statements", DBRES_MISUSE);
    3518              :     }
    3519              :     
    3520              :     // add table to in-memory data context
    3521          243 :     if (table_add_to_context(data, algo_new, table_name) == false) {
    3522              :         char buffer[1024];
    3523            0 :         snprintf(buffer, sizeof(buffer), "An error occurred while adding %s table information to global context", table_name);
    3524            0 :         return cloudsync_set_error(data, buffer, DBRES_MISUSE);
    3525              :     }
    3526              :     
    3527          243 :     if (cloudsync_refill_metatable(data, table_name) != DBRES_OK) {
    3528            0 :         return cloudsync_set_error(data, "An error occurred while trying to fill the augmented table", DBRES_MISUSE);
    3529              :     }
    3530              :         
    3531          243 :     return DBRES_OK;
    3532          246 : }
        

Generated by: LCOV version 2.4-0