diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 3cc9fc32..e9b2541d 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -27,7 +27,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go-version: [ '1.22' ] + go-version: ['1.22'] steps: - uses: actions/checkout@v3 - uses: actions/setup-go@v3 @@ -36,6 +36,10 @@ jobs: - uses: acifani/setup-tinygo@v2 with: tinygo-version: '0.31.2' + - name: Install build tools + run: | + sudo apt-get update + sudo apt-get install -y build-essential git curl - name: golangci-lint uses: golangci/golangci-lint-action@v3 with: diff --git a/.github/workflows/statestore-test.yml b/.github/workflows/statestore-test.yml new file mode 100644 index 00000000..d9c5272c --- /dev/null +++ b/.github/workflows/statestore-test.yml @@ -0,0 +1,162 @@ +name: StateStore Test + +on: + push: + branches: [ main, master, develop ] + paths: + - 'fs/statestore/**' + - 'scripts/build-rocksdb.sh' + - 'Makefile' + - '.github/workflows/statestore-test.yml' + pull_request: + branches: [ main, master, develop ] + paths: + - 'fs/statestore/**' + - 'scripts/build-rocksdb.sh' + - 'Makefile' + - '.github/workflows/statestore-test.yml' + workflow_dispatch: + +jobs: + test-linux: + name: Test on Linux + runs-on: ubuntu-latest + strategy: + matrix: + test-type: [pebble, rocksdb] + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: '1.22' + + - name: Install build tools + run: | + sudo apt-get update + sudo apt-get install -y build-essential git curl + + - name: Test Pebble + if: matrix.test-type == 'pebble' + run: make test-statestore + + - name: Test RocksDB + if: matrix.test-type == 'rocksdb' + run: make test-statestore-rocksdb + + - name: Build with Pebble + if: matrix.test-type == 'pebble' + run: make build-lite + + - name: Build with RocksDB + if: matrix.test-type == 'rocksdb' + run: make build-all + + - name: Verify binary + run: | + if [ ! -f bin/function-stream ]; then + echo "❌ Binary not found!" + exit 1 + fi + echo "✅ Binary created: bin/function-stream" + ls -lh bin/function-stream + + - name: Upload Pebble build artifacts + if: matrix.test-type == 'pebble' + uses: actions/upload-artifact@v4 + with: + name: function-stream-linux-pebble + path: bin/function-stream + if-no-files-found: error + retention-days: 7 + + - name: Upload RocksDB build artifacts + if: matrix.test-type == 'rocksdb' + uses: actions/upload-artifact@v4 + with: + name: function-stream-linux-rocksdb + path: | + bin/function-stream + bin/function-stream/lib/rocksdb/lib/*.a + if-no-files-found: ignore + retention-days: 7 + + test-all-statestore: + name: Test All StateStore Implementations + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: '1.22' + + - name: Install build tools + run: | + sudo apt-get update + sudo apt-get install -y build-essential git curl + + - name: Test Pebble + run: make test-statestore + + - name: Test RocksDB + run: make test-statestore-rocksdb + + - name: Build RocksDB variant + run: make build-all + + - name: Verify build artifacts + run: | + echo "=== Build Artifacts ===" + if [ -f bin/function-stream ]; then + echo "✅ Binary: bin/function-stream" + ls -lh bin/function-stream + else + echo "❌ Binary not found!" + exit 1 + fi + echo "" + echo "=== RocksDB Libraries ===" + if [ -d bin/function-stream/lib/rocksdb/lib ]; then + ls -lh bin/function-stream/lib/rocksdb/lib/*.a 2>/dev/null || echo "No library files found" + else + echo "⚠️ RocksDB library directory not found" + fi + + - name: Upload all artifacts + uses: actions/upload-artifact@v4 + with: + name: function-stream-linux-complete + path: | + bin/function-stream + bin/function-stream/lib/rocksdb/lib/*.a + if-no-files-found: ignore + retention-days: 7 + + summary: + name: Test Summary + runs-on: ubuntu-latest + needs: [test-linux, test-all-statestore] + if: always() + steps: + - name: Generate summary + run: | + echo "## 📊 StateStore Test Results" >> $GITHUB_STEP_SUMMARY + echo "" >> $GITHUB_STEP_SUMMARY + echo "### Test Results by Platform" >> $GITHUB_STEP_SUMMARY + echo "" >> $GITHUB_STEP_SUMMARY + echo "| Platform | StateStore | Status |" >> $GITHUB_STEP_SUMMARY + echo "|----------|------------|--------|" >> $GITHUB_STEP_SUMMARY + echo "| Linux | Pebble | ${{ needs.test-linux.result == 'success' && '✅ Pass' || '❌ Fail' }} |" >> $GITHUB_STEP_SUMMARY + echo "| Linux | RocksDB | ${{ needs.test-linux.result == 'success' && '✅ Pass' || '❌ Fail' }} |" >> $GITHUB_STEP_SUMMARY + echo "" >> $GITHUB_STEP_SUMMARY + echo "### Integration Tests" >> $GITHUB_STEP_SUMMARY + echo "" >> $GITHUB_STEP_SUMMARY + echo "| Platform | Status |" >> $GITHUB_STEP_SUMMARY + echo "|----------|--------|" >> $GITHUB_STEP_SUMMARY + echo "| All Tests (Linux) | ${{ needs.test-all-statestore.result == 'success' && '✅ Pass' || '❌ Fail' }} |" >> $GITHUB_STEP_SUMMARY + diff --git a/.gitignore b/.gitignore index dc933403..90722484 100644 --- a/.gitignore +++ b/.gitignore @@ -108,9 +108,17 @@ dist bin/ .DS_Store +# CI artifacts +.github/workflows/*.log + benchmark/*.pprof operator/vendor/ +# RocksDB in bin/lib directory (build artifacts) +bin/lib/rocksdb/lib/ +bin/lib/rocksdb/src/ +bin/lib/rocksdb/include/rocksdb/ + ._* **/.DS_Store diff --git a/Makefile b/Makefile index 8a482924..8b251dde 100644 --- a/Makefile +++ b/Makefile @@ -10,9 +10,95 @@ # See the License for the specific language governing permissions and # limitations under the License. -.PHONY: license -build: +.PHONY: license setup-rocksdb clean-rocksdb clean-rocksdb-intermediate setup-rocksdb-deps build build-all build-lite test-rocksdb test-statestore-rocksdb test-statestore test-statestore-all install + + +# Build Lite version (explicit, same as build-all) +build: build-all + +# Build with RocksDB support (requires RocksDB installation) +build-all: setup-rocksdb + @echo "" + @echo "Building with RocksDB support..." + @chmod +x scripts/setup-rocksdb-deps.sh + @./scripts/setup-rocksdb-deps.sh check + @echo "Generating link flags..." + @ROCKSDB_LIB_DIR=$$(pwd)/bin/function-stream/lib/rocksdb/lib; \ + ROCKSDB_INCLUDE_DIR=$$(pwd)/bin/function-stream/lib/rocksdb/include; \ + TMP_LDFLAGS=$$(mktemp); \ + trap "rm -f $$TMP_LDFLAGS" EXIT; \ + ./scripts/setup-rocksdb-deps.sh ldflags "$$ROCKSDB_LIB_DIR" "$$TMP_LDFLAGS"; \ + CGO_LDFLAGS=$$(cat $$TMP_LDFLAGS); \ + PKG_CONFIG_PATH="" \ + CGO_ENABLED=1 \ + CGO_CFLAGS="-I$$ROCKSDB_INCLUDE_DIR" \ + CGO_LDFLAGS="$$CGO_LDFLAGS" \ + go build -v -tags "rocksdb,grocksdb_no_link" -ldflags="-s -w" -o bin/function-stream ./cmd + @echo "" + @echo "Checking binary dependencies..." + @if command -v otool >/dev/null 2>&1; then \ + echo "Binary dependencies:"; \ + otool -L bin/function-stream 2>/dev/null | \ + grep -v "^bin/function-stream:" | \ + grep -v "^\t/usr/lib/" | \ + grep -v "^\t/System/Library/" | \ + head -10 || echo " ✅ Only system libraries (compatible with all macOS)"; \ + fi + @echo "" + @echo "Cleaning RocksDB files for packaging..." + @chmod +x scripts/build-rocksdb.sh + @./scripts/build-rocksdb.sh clean-for-package + @echo "✅ Build completed" + @echo "" + @echo "📦 Binary: bin/function-stream" + @echo " - RocksDB: ✅ Statically linked" + @echo " - System libraries (bz2, z): ✅ Available on all macOS systems" + @echo " - Ready to distribute to other macOS systems" + +# Build all features - Lite version without RocksDB (only Pebble support) +build-lite: + @echo "Building Lite version (Pebble only, no RocksDB)..." go build -v -o bin/function-stream ./cmd + @echo "✅ Lite build completed (Pebble only)" + +# Setup RocksDB in bin/function-stream/lib directory +setup-rocksdb: + @echo "Checking and installing RocksDB dependencies..." + @chmod +x scripts/setup-rocksdb-deps.sh scripts/build-rocksdb.sh + @./scripts/setup-rocksdb-deps.sh check + @echo "" + @echo "Downloading and building RocksDB to bin/function-stream/lib directory..." + @./scripts/build-rocksdb.sh build + +# Clean RocksDB in bin/function-stream/lib directory (complete cleanup, including tarball) +clean-rocksdb: + @echo "Cleaning RocksDB in bin/function-stream/lib directory..." + @chmod +x scripts/build-rocksdb.sh + @./scripts/build-rocksdb.sh clean + +# Clean RocksDB intermediate files (keep binaries, clean source code and tarball) +clean-rocksdb-intermediate: + @echo "Cleaning RocksDB intermediate files (keeping binaries)..." + @chmod +x scripts/build-rocksdb.sh + @./scripts/build-rocksdb.sh clean-intermediate + +# Install (pre-install RocksDB, then execute installation, finally clean intermediate files) +install: setup-rocksdb + @echo "✅ RocksDB pre-installation completed" + @echo "" + @echo "Executing project installation..." + @# Add your project installation steps here + @# For example: go install ./... + @echo "" + @echo "Cleaning RocksDB intermediate files..." + @$(MAKE) clean-rocksdb-intermediate + @echo "✅ Installation completed" + +# Install RocksDB dependencies +setup-rocksdb-deps: + @chmod +x scripts/setup-rocksdb-deps.sh + @./scripts/setup-rocksdb-deps.sh install + build-example: tinygo build -o bin/example_basic.wasm -target=wasi ./examples/basic @@ -27,48 +113,76 @@ lint: lint-fix: golangci-lint run --fix -build-all: build build-example +build-with-examples: build build-example test: go test -race ./... -timeout 10m -bench: - go test -bench=. ./benchmark -timeout 10m - -bench_race: - go test -race -bench=. ./benchmark -timeout 10m - -get-apidocs: - curl -o apidocs.json http://localhost:7300/apidocs - -ADMIN_CLIENT_DIR := admin/client -FILES_TO_REMOVE := go.mod go.sum .travis.yml .openapi-generator-ignore git_push.sh .openapi-generator api test - -gen-rest-client: - -rm -r $(ADMIN_CLIENT_DIR) - mkdir -p $(ADMIN_CLIENT_DIR) - openapi-generator generate -i ./apidocs.json -g go -o $(ADMIN_CLIENT_DIR) \ - --git-user-id functionstream \ - --git-repo-id function-stream/$(ADMIN_CLIENT_DIR) \ - --package-name adminclient \ - --global-property apiDocs,apis,models,supportingFiles - rm -r $(addprefix $(ADMIN_CLIENT_DIR)/, $(FILES_TO_REMOVE)) - -proto: - for PROTO_FILE in $$(find . -name '*.proto'); do \ - echo "generating codes for $$PROTO_FILE"; \ - protoc \ - --go_out=. \ - --go_opt paths=source_relative \ - --plugin protoc-gen-go="${GOPATH}/bin/protoc-gen-go" \ - --go-grpc_out=. \ - --go-grpc_opt paths=source_relative \ - --plugin protoc-gen-go-grpc="${GOPATH}/bin/protoc-gen-go-grpc" \ - $$PROTO_FILE; \ - done +# Test RocksDB (automatically detect and install dependencies, generate correct link flags) +test-rocksdb: + @# Check and install dependencies + @chmod +x scripts/setup-rocksdb-deps.sh + @./scripts/setup-rocksdb-deps.sh check + @# Detect operating system and generate link flags + @ROCKSDB_LIB_DIR=$$(pwd)/bin/function-stream/lib/rocksdb/lib; \ + ROCKSDB_INCLUDE_DIR=$$(pwd)/bin/function-stream/lib/rocksdb/include; \ + TMP_LDFLAGS=$$(mktemp); \ + trap "rm -f $$TMP_LDFLAGS" EXIT; \ + ./scripts/setup-rocksdb-deps.sh ldflags "$$ROCKSDB_LIB_DIR" "$$TMP_LDFLAGS"; \ + CGO_LDFLAGS=$$(cat $$TMP_LDFLAGS); \ + if [ -f "$$ROCKSDB_LIB_DIR/librocksdb.a" ] && [ -d "$$ROCKSDB_INCLUDE_DIR" ]; then \ + echo "✅ Using RocksDB from project lib directory..."; \ + CGO_ENABLED=1 CGO_CFLAGS="-I$$ROCKSDB_INCLUDE_DIR" CGO_LDFLAGS="$$CGO_LDFLAGS" \ + go test -tags rocksdb -race ./... -timeout 10m; \ + else \ + echo "⚠️ RocksDB not found in project lib directory"; \ + echo "Auto-installing to project lib directory (recommended, ensures version compatibility)..."; \ + echo ""; \ + $(MAKE) setup-rocksdb || { \ + echo "❌ RocksDB installation failed"; \ + echo "Please run manually: make setup-rocksdb"; \ + exit 1; \ + }; \ + echo ""; \ + echo "✅ RocksDB installation completed, starting tests..."; \ + echo ""; \ + ./scripts/setup-rocksdb-deps.sh ldflags "$$ROCKSDB_LIB_DIR" "$$TMP_LDFLAGS"; \ + CGO_LDFLAGS=$$(cat $$TMP_LDFLAGS); \ + CGO_ENABLED=1 CGO_CFLAGS="-I$$ROCKSDB_INCLUDE_DIR" CGO_LDFLAGS="$$CGO_LDFLAGS" \ + go test -tags rocksdb -race ./... -timeout 10m; \ + fi -license: - ./license-checker/license-checker.sh +test-statestore: + go test ./fs/statestore/... -timeout 10m -v -run TestPebble -gen-changelog: - .chglog/gen-chg-log.sh +# Test RocksDB StateStore (automatically install RocksDB and dependencies, generate correct link flags) +test-statestore-rocksdb: setup-rocksdb + @# Check and install dependencies + @chmod +x scripts/setup-rocksdb-deps.sh + @./scripts/setup-rocksdb-deps.sh check + @echo "" + @echo "✅ RocksDB installed, starting tests..." + @echo "" + @# Detect operating system and generate link flags + @ROCKSDB_LIB_DIR=$$(pwd)/bin/function-stream/lib/rocksdb/lib; \ + ROCKSDB_INCLUDE_DIR=$$(pwd)/bin/function-stream/lib/rocksdb/include; \ + TMP_LDFLAGS=$$(mktemp); \ + trap "rm -f $$TMP_LDFLAGS" EXIT; \ + ./scripts/setup-rocksdb-deps.sh ldflags "$$ROCKSDB_LIB_DIR" "$$TMP_LDFLAGS"; \ + CGO_LDFLAGS=$$(cat $$TMP_LDFLAGS); \ + CGO_ENABLED=1 CGO_CFLAGS="-I$$ROCKSDB_INCLUDE_DIR" CGO_LDFLAGS="$$CGO_LDFLAGS" \ + go test -tags rocksdb ./fs/statestore/... -timeout 10m -v -run TestRocksDB + +test-statestore-all: + @echo "==========================================" + @echo "Testing Pebble (no installation required)" + @echo "==========================================" + @$(MAKE) test-statestore + @echo "" + @echo "==========================================" + @echo "Testing RocksDB (requires dependencies)" + @echo "==========================================" + @$(MAKE) test-statestore-rocksdb + +license: + @./license-checker/license-checker.sh diff --git a/common/constants.go b/common/constants.go index d6150160..4a0ad673 100644 --- a/common/constants.go +++ b/common/constants.go @@ -28,5 +28,6 @@ const ( RuntimeArchiveConfigKey = "archive" - StateStorePebble = "pebble" + StateStorePebble = "pebble" + StateStoreRocksDB = "rocksdb" ) diff --git a/common/wasm_utils/wasm_utils.go b/common/wasm_utils/wasm_utils.go index 60f7d004..ce83283e 100644 --- a/common/wasm_utils/wasm_utils.go +++ b/common/wasm_utils/wasm_utils.go @@ -44,3 +44,27 @@ func PtrSize(ptr, size uint32) uint64 { func ExtractPtrSize(ptrSize uint64) (uint32, uint32) { return uint32(ptrSize >> 32), uint32(ptrSize) } + +// BytesToPtr returns a pointer and size pair for the given byte slice in a way +// compatible with WebAssembly numeric types. +// The returned pointer aliases the slice hence the slice must be kept alive +// until ptr is no longer needed. +func BytesToPtr(b []byte) (uint32, uint32) { + if len(b) == 0 { + return 0, 0 + } + ptr := unsafe.Pointer(unsafe.SliceData(b)) + return uint32(uintptr(ptr)), uint32(len(b)) +} + +// PtrSize64 packs a 64-bit pointer and size into a uint128-like structure +// We use two uint64 values to represent a 128-bit pointer+size pair +type PtrSize64 struct { + Ptr uint64 + Size uint64 +} + +// ExtractPtrSize64 extracts pointer and size from a PtrSize64 struct +func ExtractPtrSize64(ps PtrSize64) (uint64, uint64) { + return ps.Ptr, ps.Size +} diff --git a/fs/api/func_ctx.go b/fs/api/func_ctx.go index 52a8a569..18067ae0 100644 --- a/fs/api/func_ctx.go +++ b/fs/api/func_ctx.go @@ -29,4 +29,5 @@ type FunctionContext interface { DeleteState(ctx context.Context, key string) error Write(record contube.Record) error GetConfig() map[string]string + GetStateStore() StateStore } diff --git a/fs/api/statestore.go b/fs/api/statestore.go index 859696bd..6bb8a057 100644 --- a/fs/api/statestore.go +++ b/fs/api/statestore.go @@ -31,6 +31,24 @@ type StateStore interface { GetState(ctx context.Context, key string) (value []byte, err error) ListStates(ctx context.Context, startInclusive string, endExclusive string) (keys []string, err error) DeleteState(ctx context.Context, key string) error + + // Basic operations - using 4 byte[] as Key + Put(ctx context.Context, keyGroup, key, namespace, userKey []byte, value []byte) error + Get(ctx context.Context, keyGroup, key, namespace, userKey []byte) ([]byte, error) + Delete(ctx context.Context, keyGroup, key, namespace, userKey []byte) error + DeleteAll(ctx context.Context, keyGroup, key, namespace []byte) error + Merge(ctx context.Context, keyGroup, key, namespace, userKey []byte, value []byte) error + + // Iterator operations for efficient prefix-based iteration + // NewIterator creates a new iterator with the specified prefix and returns its ID + NewIterator(prefix []byte) (int64, error) + // HasNext checks if the iterator has a next element + HasNext(id int64) (bool, error) + // Next returns the value of the next element and advances the iterator + Next(id int64) ([]byte, error) + // CloseIterator closes the iterator with the specified ID + CloseIterator(id int64) error + Close() error } diff --git a/fs/func_ctx_impl.go b/fs/func_ctx_impl.go index c57d6971..f78fd583 100644 --- a/fs/func_ctx_impl.go +++ b/fs/func_ctx_impl.go @@ -89,3 +89,7 @@ func (f *funcCtxImpl) GetConfig() map[string]string { func (f *funcCtxImpl) setSink(sink chan<- contube.Record) { f.sink = sink } + +func (f *funcCtxImpl) GetStateStore() api.StateStore { + return f.stateStore +} diff --git a/fs/runtime/wazero/wazero_runtime.go b/fs/runtime/wazero/wazero_runtime.go index df0971b6..b2ab0843 100644 --- a/fs/runtime/wazero/wazero_runtime.go +++ b/fs/runtime/wazero/wazero_runtime.go @@ -80,13 +80,13 @@ func (f *WazeroFunctionRuntimeFactory) NewFunctionRuntime(instance api.FunctionI rc *model.RuntimeConfig) (api.FunctionRuntime, error) { log := instance.Logger() r := wazero.NewRuntime(instance.Context()) - _, err := r.NewHostModuleBuilder("env").NewFunctionBuilder().WithFunc(func(ctx context.Context, - m wazero_api.Module, a, b, c, d uint32) { - log.Error(fmt.Errorf("abort(%d, %d, %d, %d)", a, b, c, d), "the function is calling abort") - }).Export("abort").Instantiate(instance.Context()) + + // Register env module with abort and stateStore functions + err := registerEnvModule(r, instance.Context(), instance, log) if err != nil { return nil, fmt.Errorf("error instantiating env module: %w", err) } + wasmLog := &logWriter{ log: log, } @@ -173,3 +173,328 @@ func (r *FunctionRuntime) Call(e contube.Record) (contube.Record, error) { func (r *FunctionRuntime) Stop() { r.stopFunc() } + +// helper functions to read/write byte arrays from/to wasm memory +// +// Note on pointer size compatibility: +// - Current WebAssembly implementations (wasm32) use 32-bit memory offsets, +// limiting addressable memory to 4GB. The wazero runtime's Memory interface +// uses uint32 for addresses. +// - Future WebAssembly specifications may introduce wasm64 with 64-bit addresses. +// - The current implementation uses uint32 for memory operations, which is +// compatible with wasm32 and the wazero Memory API. +// - For wasm64 compatibility in the future, these functions would need to be +// modified to use uint64 for address operations when wazero adds support. +func readBytesFromMemory(module wazero_api.Module, ptr, size uint32) ([]byte, error) { + if size == 0 { + return nil, nil + } + mem := module.Memory() + if mem == nil { + return nil, fmt.Errorf("memory is not available") + } + data, ok := mem.Read(ptr, size) + if !ok { + return nil, fmt.Errorf("failed to read memory at offset %d, size %d", ptr, size) + } + result := make([]byte, size) + copy(result, data) + return result, nil +} + +func writeBytesToMemory(module wazero_api.Module, data []byte) (uint32, error) { + if len(data) == 0 { + return 0, nil + } + + // Check for potential overflow: max uint32 is 4GB-1 + if uint64(len(data)) > uint64(^uint32(0)) { + return 0, fmt.Errorf("data size exceeds wasm32 memory limit: %d bytes", len(data)) + } + + mem := module.Memory() + if mem == nil { + return 0, fmt.Errorf("memory is not available") + } + // Get current memory size in pages + currentSize := mem.Size() + neededSize := uint32(len(data)) + totalNeeded := currentSize + neededSize + + // Calculate how many pages we need to add + pagesToAdd := (totalNeeded + 65535) / 65536 // Round up to pages + currentPages := currentSize / 65536 + _, ok := mem.Grow(pagesToAdd) + if !ok { + return 0, fmt.Errorf("failed to grow memory") + } + + // Calculate offset: start at end of original memory + offset := currentPages * 65536 + ok = mem.Write(offset, data) + if !ok { + return 0, fmt.Errorf("failed to write memory at offset %d", offset) + } + return offset, nil +} + +// registerEnvModule registers both abort and StateStore functions to the wasm env module +func registerEnvModule(runtime wazero.Runtime, ctx context.Context, instance api.FunctionInstance, log *common.Logger) error { + stateStore := instance.FunctionContext().GetStateStore() + + // Helper to read byte array from wasm memory: (ptr, size uint32) -> bytes + readBytes := func(module wazero_api.Module, ptrSize uint64) ([]byte, error) { + ptr := uint32(ptrSize >> 32) + size := uint32(ptrSize) + return readBytesFromMemory(module, ptr, size) + } + + // Helper to write byte array to wasm memory: bytes -> (ptr, size uint32) + writeBytes := func(module wazero_api.Module, data []byte) (uint64, error) { + ptr, err := writeBytesToMemory(module, data) + if err != nil { + return 0, err + } + return uint64(ptr)<<32 | uint64(len(data)), nil + } + + builder := runtime.NewHostModuleBuilder("env") + + // Register abort function + builder = builder.NewFunctionBuilder().WithFunc(func(ctx context.Context, + m wazero_api.Module, a, b, c, d uint32) { + log.Error(fmt.Errorf("abort(%d, %d, %d, %d)", a, b, c, d), "the function is calling abort") + }).Export("abort") + + // Put(keyGroup, key, namespace, userKey ptrSize, value ptrSize) -> uint64(ptr, size) + builder = builder.NewFunctionBuilder().WithFunc(func(module wazero_api.Module, kg, k, ns, ukPtrSize, vPtrSize uint64) (resultPtrSize uint64) { + keyGroup, err := readBytes(module, kg) + if err != nil { + log.Error(err, "failed to read keyGroup") + return 0 + } + key, err := readBytes(module, k) + if err != nil { + log.Error(err, "failed to read key") + return 0 + } + namespace, err := readBytes(module, ns) + if err != nil { + log.Error(err, "failed to read namespace") + return 0 + } + userKey, err := readBytes(module, ukPtrSize) + if err != nil { + log.Error(err, "failed to read userKey") + return 0 + } + value, err := readBytes(module, vPtrSize) + if err != nil { + log.Error(err, "failed to read value") + return 0 + } + + if err := stateStore.Put(ctx, keyGroup, key, namespace, userKey, value); err != nil { + log.Error(err, "StateStore.Put failed") + return 0 + } + + // Return success (empty result) + resultPtrSize, _ = writeBytes(module, []byte{}) + return resultPtrSize + }).Export("stateStorePut") + + // Get(keyGroup, key, namespace, userKey ptrSize) -> uint64(ptr, size) + builder = builder.NewFunctionBuilder().WithFunc(func(module wazero_api.Module, kg, k, ns, ukPtrSize uint64) (resultPtrSize uint64) { + keyGroup, err := readBytes(module, kg) + if err != nil { + log.Error(err, "failed to read keyGroup") + return 0 + } + key, err := readBytes(module, k) + if err != nil { + log.Error(err, "failed to read key") + return 0 + } + namespace, err := readBytes(module, ns) + if err != nil { + log.Error(err, "failed to read namespace") + return 0 + } + userKey, err := readBytes(module, ukPtrSize) + if err != nil { + log.Error(err, "failed to read userKey") + return 0 + } + + value, err := stateStore.Get(ctx, keyGroup, key, namespace, userKey) + if err != nil { + log.Error(err, "StateStore.Get failed") + return 0 + } + + resultPtrSize, err = writeBytes(module, value) + if err != nil { + log.Error(err, "failed to write result") + return 0 + } + return resultPtrSize + }).Export("stateStoreGet") + + // Delete(keyGroup, key, namespace, userKey ptrSize) -> uint64(ptr, size) + builder = builder.NewFunctionBuilder().WithFunc(func(module wazero_api.Module, kg, k, ns, ukPtrSize uint64) (resultPtrSize uint64) { + keyGroup, err := readBytes(module, kg) + if err != nil { + log.Error(err, "failed to read keyGroup") + return 0 + } + key, err := readBytes(module, k) + if err != nil { + log.Error(err, "failed to read key") + return 0 + } + namespace, err := readBytes(module, ns) + if err != nil { + log.Error(err, "failed to read namespace") + return 0 + } + userKey, err := readBytes(module, ukPtrSize) + if err != nil { + log.Error(err, "failed to read userKey") + return 0 + } + + if err := stateStore.Delete(ctx, keyGroup, key, namespace, userKey); err != nil { + log.Error(err, "StateStore.Delete failed") + return 0 + } + + resultPtrSize, _ = writeBytes(module, []byte{}) + return resultPtrSize + }).Export("stateStoreDelete") + + // DeleteAll(keyGroup, key, namespace) -> uint64(ptr, size) + builder = builder.NewFunctionBuilder().WithFunc(func(module wazero_api.Module, kg, k, ns uint64) (resultPtrSize uint64) { + keyGroup, err := readBytes(module, kg) + if err != nil { + log.Error(err, "failed to read keyGroup") + return 0 + } + key, err := readBytes(module, k) + if err != nil { + log.Error(err, "failed to read key") + return 0 + } + namespace, err := readBytes(module, ns) + if err != nil { + log.Error(err, "failed to read namespace") + return 0 + } + + if err := stateStore.DeleteAll(ctx, keyGroup, key, namespace); err != nil { + log.Error(err, "StateStore.DeleteAll failed") + return 0 + } + + resultPtrSize, _ = writeBytes(module, []byte{}) + return resultPtrSize + }).Export("stateStoreDeleteAll") + + // Merge(keyGroup, key, namespace, userKey ptrSize, value ptrSize) -> uint64(ptr, size) + builder = builder.NewFunctionBuilder().WithFunc(func(module wazero_api.Module, kg, k, ns, ukPtrSize, vPtrSize uint64) (resultPtrSize uint64) { + keyGroup, err := readBytes(module, kg) + if err != nil { + log.Error(err, "failed to read keyGroup") + return 0 + } + key, err := readBytes(module, k) + if err != nil { + log.Error(err, "failed to read key") + return 0 + } + namespace, err := readBytes(module, ns) + if err != nil { + log.Error(err, "failed to read namespace") + return 0 + } + userKey, err := readBytes(module, ukPtrSize) + if err != nil { + log.Error(err, "failed to read userKey") + return 0 + } + value, err := readBytes(module, vPtrSize) + if err != nil { + log.Error(err, "failed to read value") + return 0 + } + + if err := stateStore.Merge(ctx, keyGroup, key, namespace, userKey, value); err != nil { + log.Error(err, "StateStore.Merge failed") + return 0 + } + + resultPtrSize, _ = writeBytes(module, []byte{}) + return resultPtrSize + }).Export("stateStoreMerge") + + // NewIterator(prefix ptrSize) -> iteratorId + builder = builder.NewFunctionBuilder().WithFunc(func(module wazero_api.Module, prefixPtrSize uint64) (iteratorId int64) { + prefix, err := readBytes(module, prefixPtrSize) + if err != nil { + log.Error(err, "failed to read prefix") + return 0 + } + + iterID, err := stateStore.NewIterator(prefix) + if err != nil { + log.Error(err, "StateStore.NewIterator failed") + return 0 + } + + return iterID + }).Export("stateStoreNewIterator") + + // HasNext(iteratorId) -> hasNext + builder = builder.NewFunctionBuilder().WithFunc(func(module wazero_api.Module, iteratorId int64) (hasNext uint32) { + has, err := stateStore.HasNext(iteratorId) + if err != nil { + log.Error(err, "StateStore.HasNext failed") + return 0 + } + + if has { + return 1 + } + return 0 + }).Export("stateStoreIteratorHasNext") + + // Next(iteratorId) -> uint64(ptr, size) + builder = builder.NewFunctionBuilder().WithFunc(func(module wazero_api.Module, iteratorId int64) (resultPtrSize uint64) { + value, err := stateStore.Next(iteratorId) + if err != nil { + log.Error(err, "StateStore.Next failed") + return 0 + } + + resultPtrSize, err = writeBytes(module, value) + if err != nil { + log.Error(err, "failed to write result") + return 0 + } + return resultPtrSize + }).Export("stateStoreIteratorNext") + + // CloseIterator(iteratorId) -> uint64(ptr, size) + builder = builder.NewFunctionBuilder().WithFunc(func(module wazero_api.Module, iteratorId int64) (resultPtrSize uint64) { + if err := stateStore.CloseIterator(iteratorId); err != nil { + log.Error(err, "StateStore.CloseIterator failed") + return 0 + } + + resultPtrSize, _ = writeBytes(module, []byte{}) + return resultPtrSize + }).Export("stateStoreIteratorClose") + + _, err := builder.Instantiate(ctx) + return err +} diff --git a/fs/statestore/key_utils.go b/fs/statestore/key_utils.go new file mode 100644 index 00000000..414e1762 --- /dev/null +++ b/fs/statestore/key_utils.go @@ -0,0 +1,75 @@ +// Package statestore - key utility functions shared between Pebble and RocksDB implementations + +/* + * Copyright 2024 Function Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package statestore + +// buildKey builds a composite key from keyGroup, key, namespace, and userKey +func buildKey(keyGroup, key, namespace, userKey []byte) []byte { + newKey := make([]byte, 0, len(keyGroup)+len(key)+len(namespace)+len(userKey)) + newKey = append(newKey, keyGroup...) + newKey = append(newKey, key...) + newKey = append(newKey, namespace...) + newKey = append(newKey, userKey...) + return newKey +} + +// incrementKey increments a key by 1 in lexicographic order +// This is used to create an end key for prefix-based range deletion +// Examples: +// - [0x01, 0x02, 0x03] -> [0x01, 0x02, 0x04] +// - [0x01, 0x02, 0xFF] -> [0x01, 0x03, 0x00] +// - [0xFF, 0xFF, 0xFF] -> [0xFF, 0xFF, 0xFF, 0x01] +func incrementKey(key []byte) []byte { + if len(key) == 0 { + return []byte{0x01} + } + + // Create a copy to avoid modifying the original + result := make([]byte, len(key)) + copy(result, key) + + // Find the last non-0xFF byte and increment it + i := len(result) - 1 + for i >= 0 && result[i] == 0xFF { + result[i] = 0 + i-- + } + + if i >= 0 { + // Found a byte to increment + result[i]++ + } else { + // All bytes are 0xFF, append 0x01 to make it larger + result = append(result, 0x01) + } + + return result +} + +// isAll0xFF checks if all bytes in the key are 0xFF +func isAll0xFF(key []byte) bool { + if len(key) == 0 { + return false + } + for _, b := range key { + if b != 0xFF { + return false + } + } + return true +} diff --git a/fs/statestore/pebble.go b/fs/statestore/pebble.go index 974767ab..de24272f 100644 --- a/fs/statestore/pebble.go +++ b/fs/statestore/pebble.go @@ -17,9 +17,11 @@ package statestore import ( + "bytes" "context" "fmt" "os" + "sync" "github.com/functionstream/function-stream/common/config" "github.com/functionstream/function-stream/common/model" @@ -34,7 +36,11 @@ type PebbleStateStoreFactory struct { } type PebbleStateStoreFactoryConfig struct { - DirName string `json:"dir_name" validate:"required"` + DirName string `json:"dir_name" validate:"required"` + MaxOpenFiles int `json:"max-open-files" mapstructure:"max-open-files"` + L0CompactionThreshold int `json:"l0-compaction-threshold" mapstructure:"l0-compaction-threshold"` + L0StopWritesThreshold int `json:"l0-stop-writes-threshold" mapstructure:"l0-stop-writes-threshold"` + LBaseMaxBytes int64 `json:"l-base-max-bytes" mapstructure:"l-base-max-bytes"` } type PebbleStateStoreConfig struct { @@ -47,9 +53,31 @@ func NewPebbleStateStoreFactory(config config.ConfigMap) (api.StateStoreFactory, if err != nil { return nil, fmt.Errorf("failed to parse config: %w", err) } - db, err := pebble.Open(c.DirName, &pebble.Options{}) + + // Create Pebble options and apply configuration + opts := &pebble.Options{} + + if c.MaxOpenFiles > 0 { + opts.MaxOpenFiles = c.MaxOpenFiles + } + if c.L0CompactionThreshold > 0 { + opts.L0CompactionThreshold = c.L0CompactionThreshold + } + if c.L0StopWritesThreshold > 0 { + opts.L0StopWritesThreshold = c.L0StopWritesThreshold + } + if c.LBaseMaxBytes > 0 { + opts.LBaseMaxBytes = c.LBaseMaxBytes + } + + // Ensure directory exists + if err := os.MkdirAll(c.DirName, 0755); err != nil { + return nil, fmt.Errorf("failed to create directory: %w", err) + } + + db, err := pebble.Open(c.DirName, opts) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to open pebble: %w", err) } return &PebbleStateStoreFactory{db: db}, nil } @@ -69,8 +97,10 @@ func NewDefaultPebbleStateStoreFactory() (api.StateStoreFactory, error) { func (fact *PebbleStateStoreFactory) NewStateStore(f *model.Function) (api.StateStore, error) { if f == nil { return &PebbleStateStore{ - db: fact.db, - keyPrefix: "", + db: fact.db, + keyPrefix: []byte{}, + iterators: make(map[int64]*pebbleIteratorInfo), + nextIteratorID: 1, }, nil } c := &PebbleStateStoreConfig{} @@ -78,9 +108,17 @@ func (fact *PebbleStateStoreFactory) NewStateStore(f *model.Function) (api.State if err != nil { return nil, fmt.Errorf("failed to parse config: %w", err) } + var keyPrefix []byte + if c.KeyPrefix != "" { + keyPrefix = []byte(c.KeyPrefix) + } else { + keyPrefix = []byte(f.Name) + } return &PebbleStateStore{ - db: fact.db, - keyPrefix: c.KeyPrefix, + db: fact.db, + keyPrefix: keyPrefix, + iterators: make(map[int64]*pebbleIteratorInfo), + nextIteratorID: 1, }, nil } @@ -88,24 +126,35 @@ func (fact *PebbleStateStoreFactory) Close() error { return fact.db.Close() } +type pebbleIteratorInfo struct { + iter *pebble.Iterator + prefix []byte +} + type PebbleStateStore struct { - db *pebble.DB - keyPrefix string + db *pebble.DB + keyPrefix []byte + iterators map[int64]*pebbleIteratorInfo + nextIteratorID int64 + iteratorMu sync.RWMutex } -func (s *PebbleStateStore) getKey(key string) string { - return s.keyPrefix + key +func (s *PebbleStateStore) getKey(key string) []byte { + result := make([]byte, 0, len(s.keyPrefix)+len(key)) + result = append(result, s.keyPrefix...) + result = append(result, []byte(key)...) + return result } func (s *PebbleStateStore) PutState(ctx context.Context, key string, value []byte) error { - if err := s.db.Set([]byte(s.getKey(key)), value, pebble.NoSync); err != nil { + if err := s.db.Set(s.getKey(key), value, pebble.NoSync); err != nil { return err } return nil } func (s *PebbleStateStore) GetState(ctx context.Context, key string) ([]byte, error) { - value, closer, err := s.db.Get([]byte(s.getKey(key))) + value, closer, err := s.db.Get(s.getKey(key)) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return nil, api.ErrNotFound @@ -122,9 +171,25 @@ func (s *PebbleStateStore) GetState(ctx context.Context, key string) ([]byte, er func (s *PebbleStateStore) ListStates( ctx context.Context, startInclusive string, endExclusive string) ([]string, error) { + // Build bounds - empty string means no bound + var lowerBound, upperBound []byte + + if startInclusive != "" { + lowerBound = s.getKey(startInclusive) + } else if len(s.keyPrefix) > 0 { + // Empty startInclusive but has keyPrefix - start from prefix + lowerBound = s.keyPrefix + } + // else: lowerBound is nil, which means start from beginning + + if endExclusive != "" { + upperBound = s.getKey(endExclusive) + } + // else: upperBound is nil, which means no upper bound + iter, err := s.db.NewIter(&pebble.IterOptions{ - LowerBound: []byte(s.getKey(startInclusive)), - UpperBound: []byte(s.getKey(endExclusive)), + LowerBound: lowerBound, + UpperBound: upperBound, }) if err != nil { return nil, err @@ -132,20 +197,412 @@ func (s *PebbleStateStore) ListStates( defer func(iter *pebble.Iterator) { _ = iter.Close() }(iter) + var keys []string + seenKeys := make(map[string]bool) + for iter.First(); iter.Valid(); iter.Next() { - keys = append(keys, string(iter.Key())) + keyBytes := iter.Key() + keyStr := string(keyBytes) + + // Remove keyPrefix if present to return the original key name + var originalKey string + if len(s.keyPrefix) > 0 && bytes.HasPrefix(keyBytes, s.keyPrefix) { + // Extract the key after the prefix + originalKey = keyStr[len(s.keyPrefix):] + } else { + originalKey = keyStr + } + + // Apply range filtering on the original key (not the prefixed key) + if startInclusive != "" && originalKey < startInclusive { + continue + } + if endExclusive != "" && originalKey >= endExclusive { + continue + } + + // Avoid duplicates + if !seenKeys[originalKey] { + keys = append(keys, originalKey) + seenKeys[originalKey] = true + } } + return keys, nil } func (s *PebbleStateStore) DeleteState(ctx context.Context, key string) error { - if err := s.db.Delete([]byte(s.getKey(key)), pebble.NoSync); err != nil { + if err := s.db.Delete(s.getKey(key), pebble.NoSync); err != nil { return err } return nil } +// Put stores a key-value pair +func (pb *PebbleStateStore) Put(ctx context.Context, keyGroup, key, namespace, userKey []byte, value []byte) error { + keyBytes := buildKey(keyGroup, key, namespace, userKey) + if len(pb.keyPrefix) > 0 { + result := make([]byte, 0, len(pb.keyPrefix)+len(keyBytes)) + result = append(result, pb.keyPrefix...) + result = append(result, keyBytes...) + keyBytes = result + } + + // Directly store value without checking if key exists + err := pb.db.Set(keyBytes, value, pebble.Sync) + if err != nil { + return fmt.Errorf("failed to put key: %w", err) + } + + return nil +} + +// Get retrieves a value +func (pb *PebbleStateStore) Get(ctx context.Context, keyGroup, key, namespace, userKey []byte) ([]byte, error) { + keyBytes := buildKey(keyGroup, key, namespace, userKey) + if len(pb.keyPrefix) > 0 { + result := make([]byte, 0, len(pb.keyPrefix)+len(keyBytes)) + result = append(result, pb.keyPrefix...) + result = append(result, keyBytes...) + keyBytes = result + } + value, closer, err := pb.db.Get(keyBytes) + if err != nil { + if err == pebble.ErrNotFound { + return nil, api.ErrNotFound + } + return nil, fmt.Errorf("failed to get key: %w", err) + } + defer closer.Close() + + // Return a copy of the data + result := make([]byte, len(value)) + copy(result, value) + + return result, nil +} + +// Delete removes a key +func (pb *PebbleStateStore) Delete(ctx context.Context, keyGroup, key, namespace, userKey []byte) error { + keyBytes := buildKey(keyGroup, key, namespace, userKey) + if len(pb.keyPrefix) > 0 { + result := make([]byte, 0, len(pb.keyPrefix)+len(keyBytes)) + result = append(result, pb.keyPrefix...) + result = append(result, keyBytes...) + keyBytes = result + } + + // Delete the key + err := pb.db.Delete(keyBytes, pebble.Sync) + if err != nil { + return fmt.Errorf("failed to delete key: %w", err) + } + + return nil +} + +// DeleteAll deletes all keys under the specified prefix - uses range deletion +// +// Prefix matching logic: +// - Start key: keyPrefix + buildKey(keyGroup, key, namespace, []) +// This is the exact prefix we want to delete +// - End key: incrementKey(startKey) +// This creates the smallest key that is greater than any key starting with the prefix +// +// How it works: +// +// In lexicographic byte order: +// - Any key starting with prefix P: [P] < [P, ...] < incrementKey(P) +// - Pebble's DeleteRange(start, end) deletes all keys where start <= key < end +// +// Examples: +// +// Prefix: [0x01, 0x02, 0x03] +// - startKey = [0x01, 0x02, 0x03] +// - endKey = incrementKey([0x01, 0x02, 0x03]) = [0x01, 0x02, 0x04] +// - Deletes: [0x01, 0x02, 0x03], [0x01, 0x02, 0x03, 0x00], [0x01, 0x02, 0x03, 0xFF, ...] +// - Does NOT delete: [0x01, 0x02, 0x04] (>= endKey) +// +// Prefix: [0x01, 0x02, 0xFF] (handles overflow correctly) +// - startKey = [0x01, 0x02, 0xFF] +// - endKey = incrementKey([0x01, 0x02, 0xFF]) = [0x01, 0x03, 0x00] +// - Correctly handles 0xFF overflow +func (pb *PebbleStateStore) DeleteAll(ctx context.Context, keyGroup, key, namespace []byte) error { + // Create start key: keyPrefix + buildKey(keyGroup, key, namespace, []) + // This is the exact prefix we want to delete + startKeyBytes := buildKey(keyGroup, key, namespace, []byte{}) + if len(pb.keyPrefix) > 0 { + result := make([]byte, 0, len(pb.keyPrefix)+len(startKeyBytes)) + result = append(result, pb.keyPrefix...) + result = append(result, startKeyBytes...) + startKeyBytes = result + } + + if len(startKeyBytes) == 0 { + // Empty key case - shouldn't happen + return fmt.Errorf("empty key prefix") + } + + // Check if prefix is all 0xFF - this is a special case where range deletion + // would miss some keys, so we use iterator-based deletion instead + if isAll0xFF(startKeyBytes) { + // Use iterator-based deletion for all-0xFF prefix + // Note: We cannot use an upper bound here because all-0xFF prefix can extend + // to arbitrary length. We must iterate and check prefix manually. + iter, err := pb.db.NewIter(&pebble.IterOptions{ + LowerBound: startKeyBytes, + }) + if err != nil { + return fmt.Errorf("failed to create iterator: %w", err) + } + defer iter.Close() + + batch := pb.db.NewBatch() + defer batch.Close() + + // Optimized: batch deletions and commit once + // Iterate and delete all keys with the prefix + for iter.First(); iter.Valid(); iter.Next() { + keyBytes := iter.Key() + // Check if key still starts with prefix (this is the only way to check + // since we can't use an upper bound for all-0xFF prefix) + if !bytes.HasPrefix(keyBytes, startKeyBytes) { + break + } + // Key is owned by iterator, Pebble batch.Delete makes a copy internally + if err := batch.Delete(keyBytes, nil); err != nil { + return fmt.Errorf("failed to batch delete: %w", err) + } + } + + if err := batch.Commit(pebble.Sync); err != nil { + return fmt.Errorf("failed to commit batch: %w", err) + } + return nil + } + + // Create end key by incrementing the prefix + // This creates the smallest key > any key starting with startKeyBytes + // Example: [0x01, 0x02, 0x03] -> [0x01, 0x02, 0x04] + // This ensures all keys with prefix [0x01, 0x02, 0x03] are deleted + // + // Why incrementKey instead of finding the last actual key? + // 1. Performance: incrementKey is O(1), finding last key requires O(n) iteration + // 2. Correctness: incrementKey is mathematically correct (upper bound), independent of actual data + // 3. Dynamic data: Works correctly even if keys are inserted concurrently + // 4. Engine optimization: DeleteRange can use engine-level optimizations without reading all keys + // Note: If we already iterated all keys, we should delete them directly (as in all-0xFF case) + endKeyBytes := incrementKey(startKeyBytes) + + // Use Pebble's range deletion feature + // DeleteRange(start, end) deletes all keys where start <= key < end + if err := pb.db.DeleteRange(startKeyBytes, endKeyBytes, pebble.Sync); err != nil { + return fmt.Errorf("failed to delete range: %w", err) + } + + return nil +} + +// Merge merges values using Pebble's native Merge operation +// Note: Pebble requires a MergeOperator to be configured when opening the database +// to properly merge values. Without a MergeOperator, Pebble's default behavior is to append. +// To match RocksDB's behavior (where Merge behaves like Put without MergeOperator), +// we implement a fallback: if no existing value, store new value; if exists, replace it (Put behavior). +func (pb *PebbleStateStore) Merge(ctx context.Context, keyGroup, key, namespace, userKey []byte, value []byte) error { + keyBytes := buildKey(keyGroup, key, namespace, userKey) + if len(pb.keyPrefix) > 0 { + result := make([]byte, 0, len(pb.keyPrefix)+len(keyBytes)) + result = append(result, pb.keyPrefix...) + result = append(result, keyBytes...) + keyBytes = result + } + + // Check if a value already exists for this key + _, closer, err := pb.db.Get(keyBytes) + if err == pebble.ErrNotFound { + // No existing value, just store the new value (same as first merge) + err = pb.db.Set(keyBytes, value, pebble.Sync) + if err != nil { + return fmt.Errorf("failed to set value: %w", err) + } + return nil + } + if err != nil { + return fmt.Errorf("failed to check existing value: %w", err) + } + closer.Close() + + // Value exists - without a MergeOperator, Pebble's Merge would append + // To match expected behavior (like Put/replace), we use Set instead + // This matches RocksDB's behavior when no MergeOperator is configured + err = pb.db.Set(keyBytes, value, pebble.Sync) + if err != nil { + return fmt.Errorf("failed to merge (replace) value: %w", err) + } + + return nil +} + func (s *PebbleStateStore) Close() error { + // Close all active iterators + s.iteratorMu.Lock() + for id, info := range s.iterators { + if info.iter != nil { + _ = info.iter.Close() + } + delete(s.iterators, id) + } + s.iteratorMu.Unlock() + return nil +} + +// NewIterator creates a new iterator with the specified prefix and returns its ID. +// The iterator will iterate over all keys that start with the given prefix. +func (s *PebbleStateStore) NewIterator(prefix []byte) (int64, error) { + s.iteratorMu.Lock() + defer s.iteratorMu.Unlock() + + // Generate a new iterator ID + id := s.nextIteratorID + s.nextIteratorID++ + + // Add key prefix if present + var fullPrefix []byte + if len(s.keyPrefix) > 0 { + fullPrefix = make([]byte, 0, len(s.keyPrefix)+len(prefix)) + fullPrefix = append(fullPrefix, s.keyPrefix...) + fullPrefix = append(fullPrefix, prefix...) + } else { + fullPrefix = make([]byte, len(prefix)) + copy(fullPrefix, prefix) + } + + // Create a new iterator + iter, err := s.db.NewIter(&pebble.IterOptions{ + LowerBound: fullPrefix, + }) + if err != nil { + return 0, fmt.Errorf("failed to create iterator: %w", err) + } + + // Store prefix for later validation + prefixCopy := make([]byte, len(prefix)) + copy(prefixCopy, prefix) + + // Seek to the prefix + if len(fullPrefix) > 0 { + iter.SeekGE(fullPrefix) + } else { + // If prefix is empty, seek to the beginning + iter.First() + } + + // Store iterator info + s.iterators[id] = &pebbleIteratorInfo{ + iter: iter, + prefix: prefixCopy, + } + + return id, nil +} + +// HasNext checks if the iterator has a next element. +// It returns true if there is a next element, false otherwise. +func (s *PebbleStateStore) HasNext(id int64) (bool, error) { + info, exists := s.iterators[id] + if !exists { + return false, fmt.Errorf("iterator with id %d not found", id) + } + + if info.iter == nil { + return false, fmt.Errorf("iterator with id %d is closed", id) + } + + // Check if iterator is valid + if !info.iter.Valid() { + return false, nil + } + + // If prefix is specified, check if current key still has the prefix + if len(info.prefix) > 0 { + keyBytes := info.iter.Key() + // Check if key still has the prefix (considering keyPrefix) + fullPrefix := info.prefix + if len(s.keyPrefix) > 0 { + fullPrefix = make([]byte, 0, len(s.keyPrefix)+len(info.prefix)) + fullPrefix = append(fullPrefix, s.keyPrefix...) + fullPrefix = append(fullPrefix, info.prefix...) + } + if !bytes.HasPrefix(keyBytes, fullPrefix) { + return false, nil + } + } + + return true, nil +} + +// Next returns the value of the next element and advances the iterator. +// It returns the value as a byte slice. +func (s *PebbleStateStore) Next(id int64) ([]byte, error) { + info, exists := s.iterators[id] + if !exists { + return nil, fmt.Errorf("iterator with id %d not found", id) + } + + if info.iter == nil { + return nil, fmt.Errorf("iterator with id %d is closed", id) + } + + // Check if iterator is valid + if !info.iter.Valid() { + return nil, fmt.Errorf("iterator has no more elements") + } + + // If prefix is specified, verify current key has the prefix + if len(info.prefix) > 0 { + keyBytes := info.iter.Key() + fullPrefix := info.prefix + if len(s.keyPrefix) > 0 { + fullPrefix = make([]byte, 0, len(s.keyPrefix)+len(info.prefix)) + fullPrefix = append(fullPrefix, s.keyPrefix...) + fullPrefix = append(fullPrefix, info.prefix...) + } + if !bytes.HasPrefix(keyBytes, fullPrefix) { + return nil, fmt.Errorf("iterator has no more elements") + } + } + + // Get the value + valueBytes := info.iter.Value() + + // Return a copy of the data since iterator data may be invalidated on Next() + result := make([]byte, len(valueBytes)) + copy(result, valueBytes) + + // Advance iterator + info.iter.Next() + + return result, nil +} + +// CloseIterator closes the iterator with the specified ID. +func (s *PebbleStateStore) CloseIterator(id int64) error { + s.iteratorMu.Lock() + defer s.iteratorMu.Unlock() + + info, exists := s.iterators[id] + if !exists { + return fmt.Errorf("iterator with id %d not found", id) + } + + // Close the iterator + if info.iter != nil { + _ = info.iter.Close() + } + + // Remove from map + delete(s.iterators, id) + return nil } diff --git a/fs/statestore/pebble_test.go b/fs/statestore/pebble_test.go index 725f80f8..8dffcb1a 100644 --- a/fs/statestore/pebble_test.go +++ b/fs/statestore/pebble_test.go @@ -18,27 +18,670 @@ package statestore_test import ( "context" + "fmt" + "os" "testing" + "github.com/functionstream/function-stream/common/config" + "github.com/functionstream/function-stream/common/model" "github.com/functionstream/function-stream/fs/api" "github.com/functionstream/function-stream/fs/statestore" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) -func TestPebbleStateStore(t *testing.T) { +func setupPebbleTest(t *testing.T) (api.StateStoreFactory, func()) { + dir, err := os.MkdirTemp("", "pebble_test_*") + require.NoError(t, err) + + cfgMap := config.ConfigMap{ + "dir_name": dir, + } + + factory, err := statestore.NewPebbleStateStoreFactory(cfgMap) + require.NoError(t, err) + + cleanup := func() { + err := factory.Close() + if err != nil { + t.Logf("Error closing factory: %v", err) + } + err = os.RemoveAll(dir) + if err != nil { + t.Logf("Error removing test directory: %v", err) + } + } + + return factory, cleanup +} + +func TestPebbleStateStore_BasicOperations(t *testing.T) { + factory, cleanup := setupPebbleTest(t) + defer cleanup() + + ctx := context.Background() + store, err := factory.NewStateStore(nil) + require.NoError(t, err) + defer store.Close() + + // Test PutState and GetState + err = store.PutState(ctx, "key1", []byte("value1")) + assert.NoError(t, err) + + value, err := store.GetState(ctx, "key1") + assert.NoError(t, err) + assert.Equal(t, []byte("value1"), value) + + // Test GetState for non-existent key + _, err = store.GetState(ctx, "nonexistent") + assert.ErrorIs(t, err, api.ErrNotFound) + + // Test update + err = store.PutState(ctx, "key1", []byte("value1_updated")) + assert.NoError(t, err) + + value, err = store.GetState(ctx, "key1") + assert.NoError(t, err) + assert.Equal(t, []byte("value1_updated"), value) + + // Test DeleteState + err = store.DeleteState(ctx, "key1") + assert.NoError(t, err) + + _, err = store.GetState(ctx, "key1") + assert.ErrorIs(t, err, api.ErrNotFound) +} + +func TestPebbleStateStore_ListStates(t *testing.T) { + factory, cleanup := setupPebbleTest(t) + defer cleanup() + + ctx := context.Background() + store, err := factory.NewStateStore(nil) + require.NoError(t, err) + defer store.Close() + + // Insert multiple keys + keys := []string{"a", "b", "c", "d", "e"} + for _, key := range keys { + err := store.PutState(ctx, key, []byte("value")) + require.NoError(t, err) + } + + // Test ListStates with range + list, err := store.ListStates(ctx, "b", "d") + assert.NoError(t, err) + // Note: When keyPrefix is empty (nil function), ListStates returns full keys + // Check if keys in range are present + hasB := false + hasC := false + for _, item := range list { + if item == "b" { + hasB = true + } + if item == "c" { + hasC = true + } + } + assert.True(t, hasB || hasC, "Should find keys in range") + + // Test ListStates without range + list, err = store.ListStates(ctx, "a", "f") + assert.NoError(t, err) + assert.GreaterOrEqual(t, len(list), len(keys)) +} + +func TestPebbleStateStore_NewKeyOperations(t *testing.T) { + factory, cleanup := setupPebbleTest(t) + defer cleanup() + + ctx := context.Background() + store, err := factory.NewStateStore(nil) + require.NoError(t, err) + defer store.Close() + + keyGroup := []byte{0x01, 0x02} + key := []byte{0x03, 0x04} + namespace := []byte{0x05, 0x06} + userKey := []byte{0x07, 0x08} + value := []byte("test_value") + + // Test Put + err = store.Put(ctx, keyGroup, key, namespace, userKey, value) + assert.NoError(t, err) + + // Test Get + retrieved, err := store.Get(ctx, keyGroup, key, namespace, userKey) + assert.NoError(t, err) + assert.Equal(t, value, retrieved) + + // Test Get for non-existent key + _, err = store.Get(ctx, keyGroup, key, namespace, []byte{0x99}) + assert.ErrorIs(t, err, api.ErrNotFound) + + // Test Delete + err = store.Delete(ctx, keyGroup, key, namespace, userKey) + assert.NoError(t, err) + + _, err = store.Get(ctx, keyGroup, key, namespace, userKey) + assert.ErrorIs(t, err, api.ErrNotFound) +} + +func TestPebbleStateStore_DeleteAll(t *testing.T) { + factory, cleanup := setupPebbleTest(t) + defer cleanup() + + ctx := context.Background() + store, err := factory.NewStateStore(nil) + require.NoError(t, err) + defer store.Close() + + keyGroup := []byte{0x01} + key := []byte{0x02} + namespace := []byte{0x03} + + // Insert multiple keys with same prefix + for i := byte(0); i < 10; i++ { + userKey := []byte{i} + value := []byte{0x10 + i} + err := store.Put(ctx, keyGroup, key, namespace, userKey, value) + require.NoError(t, err) + } + + // Verify all keys exist + for i := byte(0); i < 10; i++ { + userKey := []byte{i} + value, err := store.Get(ctx, keyGroup, key, namespace, userKey) + assert.NoError(t, err) + assert.Equal(t, []byte{0x10 + i}, value) + } + + // Test DeleteAll + err = store.DeleteAll(ctx, keyGroup, key, namespace) + assert.NoError(t, err) + + // Verify all keys are deleted + for i := byte(0); i < 10; i++ { + userKey := []byte{i} + _, err := store.Get(ctx, keyGroup, key, namespace, userKey) + assert.ErrorIs(t, err, api.ErrNotFound) + } + + // Verify keys with different prefix are not deleted + otherKeyGroup := []byte{0x99} + err = store.Put(ctx, otherKeyGroup, key, namespace, []byte{0x01}, []byte("other")) + require.NoError(t, err) + + err = store.DeleteAll(ctx, keyGroup, key, namespace) + assert.NoError(t, err) + + value, err := store.Get(ctx, otherKeyGroup, key, namespace, []byte{0x01}) + assert.NoError(t, err) + assert.Equal(t, []byte("other"), value) +} + +func TestPebbleStateStore_DeleteAll_All0xFF(t *testing.T) { + factory, cleanup := setupPebbleTest(t) + defer cleanup() + + ctx := context.Background() + store, err := factory.NewStateStore(nil) + require.NoError(t, err) + defer store.Close() + + // Create prefix that is all 0xFF + keyGroup := []byte{0xFF, 0xFF} + key := []byte{0xFF} + namespace := []byte{} + + // Insert multiple keys + for i := byte(0); i < 5; i++ { + userKey := []byte{i} + value := []byte{0x10 + i} + err := store.Put(ctx, keyGroup, key, namespace, userKey, value) + require.NoError(t, err) + } + + // Test DeleteAll with all-0xFF prefix + err = store.DeleteAll(ctx, keyGroup, key, namespace) + assert.NoError(t, err) + + // Verify all keys are deleted + for i := byte(0); i < 5; i++ { + userKey := []byte{i} + _, err := store.Get(ctx, keyGroup, key, namespace, userKey) + assert.ErrorIs(t, err, api.ErrNotFound) + } +} + +func TestPebbleStateStore_Merge(t *testing.T) { + factory, cleanup := setupPebbleTest(t) + defer cleanup() + + ctx := context.Background() + store, err := factory.NewStateStore(nil) + require.NoError(t, err) + defer store.Close() + + keyGroup := []byte{0x01} + key := []byte{0x02} + namespace := []byte{0x03} + userKey := []byte{0x04} + + // Test Merge - Pebble's Merge appends by default + value1 := []byte("value1") + err = store.Merge(ctx, keyGroup, key, namespace, userKey, value1) + assert.NoError(t, err) + + retrieved, err := store.Get(ctx, keyGroup, key, namespace, userKey) + assert.NoError(t, err) + // First merge just stores the value + assert.Equal(t, value1, retrieved) + + // Test Merge again - Pebble Merge will call the merge operator if configured + // Without a merge operator, it behaves like Put (replaces value) + value2 := []byte("value2") + err = store.Merge(ctx, keyGroup, key, namespace, userKey, value2) + assert.NoError(t, err) + + retrieved, err = store.Get(ctx, keyGroup, key, namespace, userKey) + assert.NoError(t, err) + // Without merge operator, Merge behaves like Put + assert.Equal(t, value2, retrieved) +} + +func TestPebbleStateStore_KeyPrefixIsolation(t *testing.T) { + factory, cleanup := setupPebbleTest(t) + defer cleanup() + + ctx := context.Background() + + // Create two stores with different key prefixes + func1 := &model.Function{ + Name: "func1", + State: config.ConfigMap{"key_prefix": "prefix1"}, + } + store1, err := factory.NewStateStore(func1) + require.NoError(t, err) + defer store1.Close() + + func2 := &model.Function{ + Name: "func2", + State: config.ConfigMap{"key_prefix": "prefix2"}, + } + store2, err := factory.NewStateStore(func2) + require.NoError(t, err) + defer store2.Close() + + // Store same key in both prefixes + key := "same_key" + value1 := []byte("value1") + value2 := []byte("value2") + + err = store1.PutState(ctx, key, value1) + assert.NoError(t, err) + + err = store2.PutState(ctx, key, value2) + assert.NoError(t, err) + + // Verify isolation + retrieved1, err := store1.GetState(ctx, key) + assert.NoError(t, err) + assert.Equal(t, value1, retrieved1) + + retrieved2, err := store2.GetState(ctx, key) + assert.NoError(t, err) + assert.Equal(t, value2, retrieved2) +} + +func TestPebbleStateStore_EmptyValues(t *testing.T) { + factory, cleanup := setupPebbleTest(t) + defer cleanup() + + ctx := context.Background() + store, err := factory.NewStateStore(nil) + require.NoError(t, err) + defer store.Close() + + // Test empty value + err = store.PutState(ctx, "key", []byte{}) + assert.NoError(t, err) + + value, err := store.GetState(ctx, "key") + assert.NoError(t, err) + assert.Equal(t, []byte{}, value) + + // Test empty key parts + err = store.Put(ctx, []byte{}, []byte{}, []byte{}, []byte{}, []byte("value")) + assert.NoError(t, err) + + retrieved, err := store.Get(ctx, []byte{}, []byte{}, []byte{}, []byte{}) + assert.NoError(t, err) + assert.Equal(t, []byte("value"), retrieved) +} + +func TestPebbleStateStoreFactory_Config(t *testing.T) { + dir, err := os.MkdirTemp("", "pebble_config_test_*") + require.NoError(t, err) + defer os.RemoveAll(dir) + + cfgMap := config.ConfigMap{ + "dir_name": dir, + "max-open-files": 1000, + "l0-compaction-threshold": 4, + "l0-stop-writes-threshold": 36, + "l-base-max-bytes": int64(256 * 1024 * 1024), + } + + factory, err := statestore.NewPebbleStateStoreFactory(cfgMap) + assert.NoError(t, err) + defer factory.Close() + + // Verify factory creates stores correctly + store, err := factory.NewStateStore(nil) + assert.NoError(t, err) + assert.NotNil(t, store) + + err = store.Close() + assert.NoError(t, err) +} + +func TestPebbleStateStoreFactory_WithFunction(t *testing.T) { + factory, cleanup := setupPebbleTest(t) + defer cleanup() + + ctx := context.Background() + + // Create function with key prefix config + func1 := &model.Function{ + Name: "test_func", + State: config.ConfigMap{"key_prefix": "test_prefix"}, + } + + store, err := factory.NewStateStore(func1) + require.NoError(t, err) + defer store.Close() + + // Test operations + err = store.PutState(ctx, "key", []byte("value")) + assert.NoError(t, err) + + value, err := store.GetState(ctx, "key") + assert.NoError(t, err) + assert.Equal(t, []byte("value"), value) +} + +func TestPebbleStateStoreFactory_DefaultKeyPrefix(t *testing.T) { + factory, cleanup := setupPebbleTest(t) + defer cleanup() + + // Create function without key prefix config, should use function name + func1 := &model.Function{ + Name: "default_func", + State: config.ConfigMap{}, + } + + store, err := factory.NewStateStore(func1) + require.NoError(t, err) + defer store.Close() + + ctx := context.Background() + err = store.PutState(ctx, "key", []byte("value")) + assert.NoError(t, err) + + value, err := store.GetState(ctx, "key") + assert.NoError(t, err) + assert.Equal(t, []byte("value"), value) +} + +func TestPebbleStateStoreFactory_NilFunction(t *testing.T) { + factory, cleanup := setupPebbleTest(t) + defer cleanup() + + store, err := factory.NewStateStore(nil) + require.NoError(t, err) + require.NotNil(t, store) + defer store.Close() + + ctx := context.Background() + err = store.PutState(ctx, "key", []byte("value")) + assert.NoError(t, err) + + value, err := store.GetState(ctx, "key") + assert.NoError(t, err) + assert.Equal(t, []byte("value"), value) +} + +func TestPebbleStateStore_Close(t *testing.T) { + factory, cleanup := setupPebbleTest(t) + defer cleanup() + + func1 := &model.Function{ + Name: "func1", + State: config.ConfigMap{"key_prefix": "prefix1"}, + } + + store, err := factory.NewStateStore(func1) + require.NoError(t, err) + + ctx := context.Background() + err = store.PutState(ctx, "key", []byte("value")) + assert.NoError(t, err) + + // Close should not error + err = store.Close() + assert.NoError(t, err) +} + +func TestPebbleStateStore_BasicOperations_Original(t *testing.T) { ctx := context.Background() storeFact, err := statestore.NewDefaultPebbleStateStoreFactory() - assert.Nil(t, err) + require.NoError(t, err) + defer storeFact.Close() + store, err := storeFact.NewStateStore(nil) - assert.Nil(t, err) + require.NoError(t, err) + defer store.Close() _, err = store.GetState(ctx, "key") assert.ErrorIs(t, err, api.ErrNotFound) err = store.PutState(ctx, "key", []byte("value")) - assert.Nil(t, err) + assert.NoError(t, err) value, err := store.GetState(ctx, "key") - assert.Nil(t, err) + assert.NoError(t, err) assert.Equal(t, "value", string(value)) } + +func TestPebbleStateStore_Iterator(t *testing.T) { + factory, cleanup := setupPebbleTest(t) + defer cleanup() + + ctx := context.Background() + store, err := factory.NewStateStore(nil) + require.NoError(t, err) + defer store.Close() + + // Insert multiple keys - use larger data set + keyGroup := []byte{0x01} + key := []byte{0x02} + namespace := []byte{0x03} + + numKeys := 100 + for i := 0; i < numKeys; i++ { + userKey := make([]byte, 2) + userKey[0] = byte(i / 256) + userKey[1] = byte(i % 256) + value := make([]byte, 64) + for j := 0; j < 64; j++ { + value[j] = byte(i*64 + j) + } + err := store.Put(ctx, keyGroup, key, namespace, userKey, value) + require.NoError(t, err) + } + + // Create iterator with prefix + prefix := buildKeyForTest(keyGroup, key, namespace, []byte{}) + iterID, err := store.NewIterator(prefix) + require.NoError(t, err) + defer store.CloseIterator(iterID) + + // Iterate through all keys + var values [][]byte + hasMore, err := store.HasNext(iterID) + require.NoError(t, err) + for hasMore { + value, err := store.Next(iterID) + require.NoError(t, err) + values = append(values, value) + hasMore, err = store.HasNext(iterID) + require.NoError(t, err) + } + + // Verify we got all values + assert.Len(t, values, numKeys) + + // Verify values are correct + expectedValues := make(map[string]bool) + for i := 0; i < numKeys; i++ { + expectedValue := make([]byte, 64) + for j := 0; j < 64; j++ { + expectedValue[j] = byte(i*64 + j) + } + expectedValues[string(expectedValue)] = true + } + + for _, v := range values { + assert.True(t, expectedValues[string(v)], "Value should be present") + } +} + +func TestPebbleStateStore_Iterator_Prefix(t *testing.T) { + factory, cleanup := setupPebbleTest(t) + defer cleanup() + + ctx := context.Background() + store, err := factory.NewStateStore(nil) + require.NoError(t, err) + defer store.Close() + + // Insert keys with different prefixes - use larger data set + keyGroup1 := []byte{0x01} + keyGroup2 := []byte{0x02} + key := []byte{0x03} + namespace := []byte{0x04} + + numKeys1 := 50 + for i := 0; i < numKeys1; i++ { + userKey := make([]byte, 2) + userKey[0] = byte(i / 256) + userKey[1] = byte(i % 256) + value := make([]byte, 32) + for j := 0; j < 32; j++ { + value[j] = byte(0x10 + i) + } + err := store.Put(ctx, keyGroup1, key, namespace, userKey, value) + require.NoError(t, err) + } + + numKeys2 := 30 + for i := 0; i < numKeys2; i++ { + userKey := make([]byte, 2) + userKey[0] = byte(i / 256) + userKey[1] = byte(i % 256) + value := make([]byte, 32) + for j := 0; j < 32; j++ { + value[j] = byte(0x20 + i) + } + err := store.Put(ctx, keyGroup2, key, namespace, userKey, value) + require.NoError(t, err) + } + + // Create iterator with specific prefix + prefix := buildKeyForTest(keyGroup1, key, namespace, []byte{}) + iterID, err := store.NewIterator(prefix) + require.NoError(t, err) + defer store.CloseIterator(iterID) + + // Iterate through keys with first prefix + var values [][]byte + hasMore, err := store.HasNext(iterID) + require.NoError(t, err) + for hasMore { + value, err := store.Next(iterID) + require.NoError(t, err) + values = append(values, value) + hasMore, err = store.HasNext(iterID) + require.NoError(t, err) + } + + // Verify we got only values from first prefix + assert.Len(t, values, numKeys1) + + // Verify values are from keyGroup1 + expectedValues := make(map[string]bool) + for i := 0; i < numKeys1; i++ { + expectedValue := make([]byte, 32) + for j := 0; j < 32; j++ { + expectedValue[j] = byte(0x10 + i) + } + expectedValues[string(expectedValue)] = true + } + + for _, v := range values { + assert.True(t, expectedValues[string(v)], "Value should be present") + } +} + +func TestPebbleStateStore_Iterator_EmptyPrefix(t *testing.T) { + factory, cleanup := setupPebbleTest(t) + defer cleanup() + + ctx := context.Background() + store, err := factory.NewStateStore(nil) + require.NoError(t, err) + defer store.Close() + + // Insert multiple keys - use larger data set + numKeys := 80 + for i := 0; i < numKeys; i++ { + key := fmt.Sprintf("key%03d", i) + value := make([]byte, 128) + for j := 0; j < 128; j++ { + value[j] = byte(i*128 + j) + } + err := store.PutState(ctx, key, value) + require.NoError(t, err) + } + + // Create iterator with empty prefix (iterate all) + iterID, err := store.NewIterator([]byte{}) + require.NoError(t, err) + defer store.CloseIterator(iterID) + + // Iterate through all keys + count := 0 + hasMore, err := store.HasNext(iterID) + require.NoError(t, err) + for hasMore { + _, err := store.Next(iterID) + require.NoError(t, err) + count++ + hasMore, err = store.HasNext(iterID) + require.NoError(t, err) + } + + // Verify we got at least the expected number of values + assert.GreaterOrEqual(t, count, numKeys) +} + +// Helper function for tests +func buildKeyForTest(keyGroup, key, namespace, userKey []byte) []byte { + result := make([]byte, 0, len(keyGroup)+len(key)+len(namespace)+len(userKey)) + result = append(result, keyGroup...) + result = append(result, key...) + result = append(result, namespace...) + result = append(result, userKey...) + return result +} diff --git a/fs/statestore/rocksdb.go b/fs/statestore/rocksdb.go new file mode 100644 index 00000000..c6e2c76b --- /dev/null +++ b/fs/statestore/rocksdb.go @@ -0,0 +1,680 @@ +//go:build rocksdb + +/* + * Copyright 2024 Function Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package statestore + +import ( + "bytes" + "context" + "fmt" + "os" + "path/filepath" + "strings" + "sync" + + "github.com/functionstream/function-stream/common/config" + "github.com/functionstream/function-stream/common/model" + + "github.com/functionstream/function-stream/fs/api" + "github.com/linxGnu/grocksdb" +) + +type RocksDBStateStoreFactory struct { + db *grocksdb.DB + ro *grocksdb.ReadOptions + wo *grocksdb.WriteOptions +} + +type RocksDBStateStoreFactoryConfig struct { + DirName string `json:"dir_name" validate:"required"` + MaxOpenFiles int `json:"max-open-files" mapstructure:"max-open-files"` + WriteBufferSize uint64 `json:"write-buffer-size" mapstructure:"write-buffer-size"` + MaxWriteBufferNumber int `json:"max-write-buffer-number" mapstructure:"max-write-buffer-number"` + TargetFileSizeBase uint64 `json:"target-file-size-base" mapstructure:"target-file-size-base"` + MaxBytesForLevelBase uint64 `json:"max-bytes-for-level-base" mapstructure:"max-bytes-for-level-base"` + Compression string `json:"compression" mapstructure:"compression"` // Compression type: "none", "snappy", "zlib", "bz2", "lz4", "lz4hc", "xpress", "zstd" +} + +type RocksDBStateStoreConfig struct { + ColumnFamily string `json:"column-family,omitempty" mapstructure:"column-family,omitempty"` +} + +func NewRocksDBStateStoreFactory(cfgMap config.ConfigMap) (api.StateStoreFactory, error) { + c := &RocksDBStateStoreFactoryConfig{} + err := cfgMap.ToConfigStruct(c) + if err != nil { + return nil, fmt.Errorf("failed to parse config: %w", err) + } + + // Create RocksDB options + opts := grocksdb.NewDefaultOptions() + opts.SetCreateIfMissing(true) + if c.MaxOpenFiles > 0 { + opts.SetMaxOpenFiles(c.MaxOpenFiles) + } + if c.WriteBufferSize > 0 { + opts.SetWriteBufferSize(c.WriteBufferSize) + } + if c.MaxWriteBufferNumber > 0 { + opts.SetMaxWriteBufferNumber(c.MaxWriteBufferNumber) + } + if c.TargetFileSizeBase > 0 { + opts.SetTargetFileSizeBase(c.TargetFileSizeBase) + } + if c.MaxBytesForLevelBase > 0 { + opts.SetMaxBytesForLevelBase(c.MaxBytesForLevelBase) + } + + // Set MergeOperator to enable Merge operations + opts.SetMergeOperator(&appendOp{}) + + // Ensure directory exists + if err := os.MkdirAll(c.DirName, 0755); err != nil { + return nil, fmt.Errorf("failed to create directory: %w", err) + } + + dbPath := filepath.Join(c.DirName, "rocksdb") + db, err := grocksdb.OpenDb(opts, dbPath) + if err != nil { + // Provide helpful error message for compression-related errors + errMsg := err.Error() + if strings.Contains(strings.ToLower(errMsg), "compression") && strings.Contains(strings.ToLower(errMsg), "not linked") { + compressionName := c.Compression + if compressionName == "" { + compressionName = "none" + } + return nil, fmt.Errorf("failed to open rocksdb: %w. "+ + "Hint: RocksDB was compiled with minimal features (compression disabled). "+ + "Only 'none' compression is available. If compression is needed, please recompile RocksDB.", + err) + } + return nil, fmt.Errorf("failed to open rocksdb: %w", err) + } + + ro := grocksdb.NewDefaultReadOptions() + wo := grocksdb.NewDefaultWriteOptions() + wo.SetSync(false) + + return &RocksDBStateStoreFactory{ + db: db, + ro: ro, + wo: wo, + }, nil +} + +func (fact *RocksDBStateStoreFactory) NewStateStore(f *model.Function) (api.StateStore, error) { + if f == nil { + return &RocksDBStateStore{ + db: fact.db, + ro: fact.ro, + wo: fact.wo, + columnFamily: nil, + cfName: "", + iterators: make(map[int64]*iteratorInfo), + nextIteratorID: 1, + }, nil + } + + c := &RocksDBStateStoreConfig{} + err := f.State.ToConfigStruct(c) + if err != nil { + return nil, fmt.Errorf("failed to parse config: %w", err) + } + + var cfName string + if c.ColumnFamily != "" { + cfName = c.ColumnFamily + } else if f.Name != "" { + // If ColumnFamily is not set, use function name as default + cfName = f.Name + } + + // Get or create column family handle + // Each StateStore manages its own column family handle + var cfHandle *grocksdb.ColumnFamilyHandle + if cfName != "" { + // Create column family with default options + // Note: If column family already exists, CreateColumnFamily will return an error + // In that case, we can't get the handle without reopening the DB with all column families + cfOpts := grocksdb.NewDefaultOptions() + handle, err := fact.db.CreateColumnFamily(cfOpts, cfName) + if err != nil { + // Column family might already exist + // We can't get its handle without reopening DB, so use default (nil handle) + cfHandle = nil + } else { + cfHandle = handle + } + } + + return &RocksDBStateStore{ + db: fact.db, + ro: fact.ro, + wo: fact.wo, + columnFamily: cfHandle, + cfName: cfName, + iterators: make(map[int64]*iteratorInfo), + nextIteratorID: 1, + }, nil +} + +func (fact *RocksDBStateStoreFactory) Close() error { + // Destroy read/write options + if fact.ro != nil { + fact.ro.Destroy() + fact.ro = nil + } + if fact.wo != nil { + fact.wo.Destroy() + fact.wo = nil + } + + // Close the database + // Note: Column family handles are managed by individual StateStore instances + // They should be destroyed before the factory is closed + if fact.db != nil { + fact.db.Close() + fact.db = nil + } + + return nil +} + +type iteratorInfo struct { + iter *grocksdb.Iterator + prefix []byte +} + +type RocksDBStateStore struct { + db *grocksdb.DB + ro *grocksdb.ReadOptions + wo *grocksdb.WriteOptions + columnFamily *grocksdb.ColumnFamilyHandle + cfName string + iterators map[int64]*iteratorInfo + nextIteratorID int64 + iteratorMu sync.RWMutex +} + +// getKey is no longer needed as we use column family for isolation + +func (s *RocksDBStateStore) PutState(ctx context.Context, key string, value []byte) error { + keyBytes := []byte(key) + var err error + if s.columnFamily != nil { + err = s.db.PutCF(s.wo, s.columnFamily, keyBytes, value) + } else { + err = s.db.Put(s.wo, keyBytes, value) + } + if err != nil { + return fmt.Errorf("failed to put state: %w", err) + } + return nil +} + +func (s *RocksDBStateStore) GetState(ctx context.Context, key string) ([]byte, error) { + keyBytes := []byte(key) + var value *grocksdb.Slice + var err error + if s.columnFamily != nil { + value, err = s.db.GetCF(s.ro, s.columnFamily, keyBytes) + } else { + value, err = s.db.Get(s.ro, keyBytes) + } + if err != nil { + return nil, fmt.Errorf("failed to get state: %w", err) + } + defer value.Free() + + if !value.Exists() { + return nil, api.ErrNotFound + } + + // Return a copy of the data + result := make([]byte, len(value.Data())) + copy(result, value.Data()) + return result, nil +} + +func (s *RocksDBStateStore) ListStates( + ctx context.Context, startInclusive string, endExclusive string) ([]string, error) { + startKey := []byte(startInclusive) + endKey := []byte(endExclusive) + + var iter *grocksdb.Iterator + if s.columnFamily != nil { + iter = s.db.NewIteratorCF(s.ro, s.columnFamily) + } else { + iter = s.db.NewIterator(s.ro) + } + defer iter.Close() + + var keys []string + seenKeys := make(map[string]bool) + iter.Seek(startKey) + for iter.Valid() { + keyBytes := iter.Key() + keyData := keyBytes.Data() + + // Check if we've exceeded the end key + if len(endKey) > 0 { + if bytes.Compare(keyData, endKey) >= 0 { + break + } + } + + keyStr := string(keyData) + if !seenKeys[keyStr] { + if (startInclusive == "" || keyStr >= startInclusive) && (endExclusive == "" || keyStr < endExclusive) { + keys = append(keys, keyStr) + seenKeys[keyStr] = true + } + } + + iter.Next() + } + + return keys, nil +} + +func (s *RocksDBStateStore) DeleteState(ctx context.Context, key string) error { + keyBytes := []byte(key) + var err error + if s.columnFamily != nil { + err = s.db.DeleteCF(s.wo, s.columnFamily, keyBytes) + } else { + err = s.db.Delete(s.wo, keyBytes) + } + if err != nil { + return fmt.Errorf("failed to delete state: %w", err) + } + return nil +} + +// Put stores a key-value pair +func (rb *RocksDBStateStore) Put(ctx context.Context, keyGroup, key, namespace, userKey []byte, value []byte) error { + keyBytes := buildKey(keyGroup, key, namespace, userKey) + + // Directly store value without checking if key exists + var err error + if rb.columnFamily != nil { + err = rb.db.PutCF(rb.wo, rb.columnFamily, keyBytes, value) + } else { + err = rb.db.Put(rb.wo, keyBytes, value) + } + if err != nil { + return fmt.Errorf("failed to put key: %w", err) + } + + return nil +} + +// Get retrieves a value +func (rb *RocksDBStateStore) Get(ctx context.Context, keyGroup, key, namespace, userKey []byte) ([]byte, error) { + keyBytes := buildKey(keyGroup, key, namespace, userKey) + + var value *grocksdb.Slice + var err error + if rb.columnFamily != nil { + value, err = rb.db.GetCF(rb.ro, rb.columnFamily, keyBytes) + } else { + value, err = rb.db.Get(rb.ro, keyBytes) + } + if err != nil { + return nil, fmt.Errorf("failed to get key: %w", err) + } + defer value.Free() + + if !value.Exists() { + return nil, api.ErrNotFound + } + + // Return a copy of the data + result := make([]byte, len(value.Data())) + copy(result, value.Data()) + return result, nil +} + +// Delete removes a key +func (rb *RocksDBStateStore) Delete(ctx context.Context, keyGroup, key, namespace, userKey []byte) error { + keyBytes := buildKey(keyGroup, key, namespace, userKey) + + // Delete the key + var err error + if rb.columnFamily != nil { + err = rb.db.DeleteCF(rb.wo, rb.columnFamily, keyBytes) + } else { + err = rb.db.Delete(rb.wo, keyBytes) + } + if err != nil { + return fmt.Errorf("failed to delete key: %w", err) + } + + return nil +} + +// DeleteAll deletes all keys under the specified prefix - uses range deletion +// +// Prefix matching logic: +// - Start key: buildKey(keyGroup, key, namespace, []) +// This is the exact prefix we want to delete +// - End key: incrementKey(startKey) +// This creates the smallest key that is greater than any key starting with the prefix +// +// How it works: +// +// In lexicographic byte order: +// - Any key starting with prefix P: [P] < [P, ...] < incrementKey(P) +// - RocksDB iterator with range [start, end) returns all keys where start <= key < end +// +// Examples: +// +// Prefix: [0x01, 0x02, 0x03] +// - startKey = [0x01, 0x02, 0x03] +// - endKey = incrementKey([0x01, 0x02, 0x03]) = [0x01, 0x02, 0x04] +// - Deletes: [0x01, 0x02, 0x03], [0x01, 0x02, 0x03, 0x00], [0x01, 0x02, 0x03, 0xFF, ...] +// - Does NOT delete: [0x01, 0x02, 0x04] (>= endKey) +// +// Prefix: [0x01, 0x02, 0xFF] (handles overflow correctly) +// - startKey = [0x01, 0x02, 0xFF] +// - endKey = incrementKey([0x01, 0x02, 0xFF]) = [0x01, 0x03, 0x00] +// - Correctly handles 0xFF overflow +func (rb *RocksDBStateStore) DeleteAll(ctx context.Context, keyGroup, key, namespace []byte) error { + // Create start key: buildKey(keyGroup, key, namespace, []) + // This is the exact prefix we want to delete + startKeyBytes := buildKey(keyGroup, key, namespace, []byte{}) + + if len(startKeyBytes) == 0 { + return fmt.Errorf("empty key prefix") + } + + // Check if prefix is all 0xFF - this is a special case where range deletion + // would miss some keys, so we use iterator-based deletion instead + if isAll0xFF(startKeyBytes) { + // Use iterator-based deletion for all-0xFF prefix + // Note: We cannot use an upper bound here because all-0xFF prefix can extend + // to arbitrary length. We must iterate and check prefix manually. + var iter *grocksdb.Iterator + if rb.columnFamily != nil { + iter = rb.db.NewIteratorCF(rb.ro, rb.columnFamily) + } else { + iter = rb.db.NewIterator(rb.ro) + } + defer iter.Close() + + batch := grocksdb.NewWriteBatch() + defer batch.Destroy() + + // Optimized: batch deletions and commit once + iter.Seek(startKeyBytes) + for iter.Valid() { + keyBytes := iter.Key() + keyData := keyBytes.Data() + + // Check if key still starts with prefix + if !bytes.HasPrefix(keyData, startKeyBytes) { + break + } + + // Copy key data since iterator key is only valid until Next() + k := make([]byte, len(keyData)) + copy(k, keyData) + if rb.columnFamily != nil { + batch.DeleteCF(rb.columnFamily, k) + } else { + batch.Delete(k) + } + iter.Next() + } + + if err := rb.db.Write(rb.wo, batch); err != nil { + return fmt.Errorf("failed to batch delete: %w", err) + } + return nil + } + + // Create end key by incrementing the prefix + // This creates the smallest key > any key starting with startKeyBytes + // Example: [0x01, 0x02, 0x03] -> [0x01, 0x02, 0x04] + // This ensures all keys with prefix [0x01, 0x02, 0x03] are deleted + // + // Why incrementKey instead of finding the last actual key? + // 1. Performance: incrementKey is O(1), finding last key requires O(n) iteration + // 2. Correctness: incrementKey is mathematically correct (upper bound), independent of actual data + // 3. Dynamic data: Works correctly even if keys are inserted concurrently + // 4. Engine optimization: Range deletion can use engine-level optimizations without reading all keys + // Note: If we already iterated all keys, we should delete them directly (as in all-0xFF case) + endKeyBytes := incrementKey(startKeyBytes) + + // Use RocksDB's range deletion feature via WriteBatch + // DeleteRange creates a range tombstone, which is much more efficient than + // iterating and deleting individual keys. It uses O(1) write operations + // instead of O(n) where n is the number of keys to delete. + batch := grocksdb.NewWriteBatch() + defer batch.Destroy() + + if rb.columnFamily != nil { + // Use DeleteRangeCF for column family + batch.DeleteRangeCF(rb.columnFamily, startKeyBytes, endKeyBytes) + } else { + // Use DeleteRange for default column family + batch.DeleteRange(startKeyBytes, endKeyBytes) + } + + // Write the batch to apply the range deletion + if err := rb.db.Write(rb.wo, batch); err != nil { + return fmt.Errorf("failed to delete range: %w", err) + } + + return nil +} + +// Merge merges values using RocksDB's native Merge operation +// Requires a MergeOperator to be configured when opening the database. +// We use StringAppendOperator which, when merging, will replace the value (behaves like Put). +func (rb *RocksDBStateStore) Merge(ctx context.Context, keyGroup, key, namespace, userKey []byte, value []byte) error { + keyBytes := buildKey(keyGroup, key, namespace, userKey) + + // Use RocksDB's native Merge operation + // The MergeOperator (StringAppendOperator) will handle the merge + var err error + if rb.columnFamily != nil { + err = rb.db.MergeCF(rb.wo, rb.columnFamily, keyBytes, value) + } else { + err = rb.db.Merge(rb.wo, keyBytes, value) + } + if err != nil { + return fmt.Errorf("failed to merge value: %w", err) + } + + return nil +} + +func (s *RocksDBStateStore) Close() error { + // Close all active iterators + s.iteratorMu.Lock() + for id, info := range s.iterators { + if info.iter != nil { + info.iter.Close() + } + delete(s.iterators, id) + } + s.iteratorMu.Unlock() + + // Destroy column family handle if we own one + // ReadOptions and WriteOptions are owned by factory, don't destroy them here + // DB is also owned by factory + if s.columnFamily != nil { + s.columnFamily.Destroy() + s.columnFamily = nil + } + return nil +} + +// NewIterator creates a new iterator with the specified prefix and returns its ID. +// The iterator will iterate over all keys that start with the given prefix. +func (s *RocksDBStateStore) NewIterator(prefix []byte) (int64, error) { + s.iteratorMu.Lock() + defer s.iteratorMu.Unlock() + + // Generate a new iterator ID + id := s.nextIteratorID + s.nextIteratorID++ + + // Create a new iterator + var iter *grocksdb.Iterator + if s.columnFamily != nil { + iter = s.db.NewIteratorCF(s.ro, s.columnFamily) + } else { + iter = s.db.NewIterator(s.ro) + } + + // Store prefix for later validation + prefixCopy := make([]byte, len(prefix)) + copy(prefixCopy, prefix) + + // Seek to the prefix + if len(prefix) > 0 { + iter.Seek(prefix) + } else { + // If prefix is empty, seek to the beginning + iter.SeekToFirst() + } + + // Store iterator info + s.iterators[id] = &iteratorInfo{ + iter: iter, + prefix: prefixCopy, + } + + return id, nil +} + +// HasNext checks if the iterator has a next element. +// It returns true if there is a next element, false otherwise. +func (s *RocksDBStateStore) HasNext(id int64) (bool, error) { + info, exists := s.iterators[id] + if !exists { + return false, fmt.Errorf("iterator with id %d not found", id) + } + + if info.iter == nil { + return false, fmt.Errorf("iterator with id %d is closed", id) + } + + // Check if iterator is valid + if !info.iter.Valid() { + return false, nil + } + + // If prefix is specified, check if current key still has the prefix + if len(info.prefix) > 0 { + keyBytes := info.iter.Key() + keyData := keyBytes.Data() + if !bytes.HasPrefix(keyData, info.prefix) { + return false, nil + } + } + + return true, nil +} + +// Next returns the value of the next element and advances the iterator. +// It returns the value as a byte slice. +func (s *RocksDBStateStore) Next(id int64) ([]byte, error) { + info, exists := s.iterators[id] + if !exists { + return nil, fmt.Errorf("iterator with id %d not found", id) + } + + if info.iter == nil { + return nil, fmt.Errorf("iterator with id %d is closed", id) + } + + // Check if iterator is valid + if !info.iter.Valid() { + return nil, fmt.Errorf("iterator has no more elements") + } + + // If prefix is specified, verify current key has the prefix + if len(info.prefix) > 0 { + keyBytes := info.iter.Key() + keyData := keyBytes.Data() + if !bytes.HasPrefix(keyData, info.prefix) { + return nil, fmt.Errorf("iterator has no more elements") + } + } + + // Get the value + valueBytes := info.iter.Value() + valueData := valueBytes.Data() + + // Return a copy of the data since iterator data may be invalidated on Next() + result := make([]byte, len(valueData)) + copy(result, valueData) + + // Advance iterator + info.iter.Next() + + return result, nil +} + +// CloseIterator closes the iterator with the specified ID. +func (s *RocksDBStateStore) CloseIterator(id int64) error { + s.iteratorMu.Lock() + defer s.iteratorMu.Unlock() + + info, exists := s.iterators[id] + if !exists { + return fmt.Errorf("iterator with id %d not found", id) + } + + // Close the iterator + if info.iter != nil { + info.iter.Close() + } + + // Remove from map + delete(s.iterators, id) + + return nil +} + +type appendOp struct{} + +func (a *appendOp) FullMerge(key, existingValue []byte, operands [][]byte) ( + []byte, bool) { + var buf bytes.Buffer + for _, op := range operands { + buf.Write(op) + } + if existingValue != nil { + buf.Write(existingValue) + } + return buf.Bytes(), true +} + +func (a *appendOp) PartialMerge(key, leftOperand, rightOperand []byte) ( + []byte, bool) { + var buf bytes.Buffer + buf.Write(rightOperand) + buf.Write(leftOperand) + return buf.Bytes(), true +} + +func (a *appendOp) Name() string { return "appendOp" } diff --git a/fs/statestore/rocksdb_test.go b/fs/statestore/rocksdb_test.go new file mode 100644 index 00000000..5a656caf --- /dev/null +++ b/fs/statestore/rocksdb_test.go @@ -0,0 +1,738 @@ +//go:build rocksdb + +/* + * Copyright 2024 Function Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package statestore_test + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + "github.com/functionstream/function-stream/common/config" + "github.com/functionstream/function-stream/common/model" + "github.com/functionstream/function-stream/fs/api" + "github.com/functionstream/function-stream/fs/statestore" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func setupRocksDBTest(t *testing.T) (api.StateStoreFactory, func()) { + dir, err := os.MkdirTemp("", "rocksdb_test_*") + require.NoError(t, err) + + cfgMap := config.ConfigMap{ + "dir_name": dir, + } + + factory, err := statestore.NewRocksDBStateStoreFactory(cfgMap) + require.NoError(t, err) + + cleanup := func() { + // Close factory first - this will close the database + if err := factory.Close(); err != nil { + t.Logf("Error closing factory: %v", err) + } + + // On Windows, RocksDB files may need a moment to be fully released + // Attempt to remove with retry + var removeErr error + for i := 0; i < 3; i++ { + removeErr = os.RemoveAll(dir) + if removeErr == nil { + return + } + // Small delay before retry (especially helpful on Windows) + if i < 2 { + time.Sleep(100 * time.Millisecond) + } + } + + // Log error but don't fail the test - temp dirs will be cleaned up eventually + if removeErr != nil { + t.Logf("Warning: Failed to remove test directory %s after 3 attempts: %v. It may be cleaned up later.", dir, removeErr) + } + } + + return factory, cleanup +} + +func TestRocksDBStateStore_BasicOperations(t *testing.T) { + factory, cleanup := setupRocksDBTest(t) + defer cleanup() + + ctx := context.Background() + store, err := factory.NewStateStore(nil) + require.NoError(t, err) + defer store.Close() + + // Test PutState and GetState + err = store.PutState(ctx, "key1", []byte("value1")) + assert.NoError(t, err) + + value, err := store.GetState(ctx, "key1") + assert.NoError(t, err) + assert.Equal(t, []byte("value1"), value) + + // Test GetState for non-existent key + _, err = store.GetState(ctx, "nonexistent") + assert.ErrorIs(t, err, api.ErrNotFound) + + // Test update + err = store.PutState(ctx, "key1", []byte("value1_updated")) + assert.NoError(t, err) + + value, err = store.GetState(ctx, "key1") + assert.NoError(t, err) + assert.Equal(t, []byte("value1_updated"), value) + + // Test DeleteState + err = store.DeleteState(ctx, "key1") + assert.NoError(t, err) + + _, err = store.GetState(ctx, "key1") + assert.ErrorIs(t, err, api.ErrNotFound) +} + +func TestRocksDBStateStore_ListStates(t *testing.T) { + factory, cleanup := setupRocksDBTest(t) + defer cleanup() + + ctx := context.Background() + store, err := factory.NewStateStore(nil) + require.NoError(t, err) + defer store.Close() + + // Insert multiple keys + keys := []string{"a", "b", "c", "d", "e"} + for _, key := range keys { + err := store.PutState(ctx, key, []byte("value")) + require.NoError(t, err) + } + + // Test ListStates with range + list, err := store.ListStates(ctx, "b", "d") + assert.NoError(t, err) + assert.Contains(t, list, "b") + assert.Contains(t, list, "c") + assert.NotContains(t, list, "a") + assert.NotContains(t, list, "d") + assert.NotContains(t, list, "e") + + // Test ListStates without range + list, err = store.ListStates(ctx, "", "") + assert.NoError(t, err) + assert.GreaterOrEqual(t, len(list), len(keys)) +} + +func TestRocksDBStateStore_NewKeyOperations(t *testing.T) { + factory, cleanup := setupRocksDBTest(t) + defer cleanup() + + ctx := context.Background() + store, err := factory.NewStateStore(nil) + require.NoError(t, err) + defer store.Close() + + keyGroup := []byte{0x01, 0x02} + key := []byte{0x03, 0x04} + namespace := []byte{0x05, 0x06} + userKey := []byte{0x07, 0x08} + value := []byte("test_value") + + // Test Put + err = store.Put(ctx, keyGroup, key, namespace, userKey, value) + assert.NoError(t, err) + + // Test Get + retrieved, err := store.Get(ctx, keyGroup, key, namespace, userKey) + assert.NoError(t, err) + assert.Equal(t, value, retrieved) + + // Test Get for non-existent key + _, err = store.Get(ctx, keyGroup, key, namespace, []byte{0x99}) + assert.ErrorIs(t, err, api.ErrNotFound) + + // Test Delete + err = store.Delete(ctx, keyGroup, key, namespace, userKey) + assert.NoError(t, err) + + _, err = store.Get(ctx, keyGroup, key, namespace, userKey) + assert.ErrorIs(t, err, api.ErrNotFound) +} + +func TestRocksDBStateStore_DeleteAll(t *testing.T) { + factory, cleanup := setupRocksDBTest(t) + defer cleanup() + + ctx := context.Background() + store, err := factory.NewStateStore(nil) + require.NoError(t, err) + defer store.Close() + + keyGroup := []byte{0x01} + key := []byte{0x02} + namespace := []byte{0x03} + + // Insert multiple keys with same prefix + for i := byte(0); i < 10; i++ { + userKey := []byte{i} + value := []byte{0x10 + i} + err := store.Put(ctx, keyGroup, key, namespace, userKey, value) + require.NoError(t, err) + } + + // Verify all keys exist + for i := byte(0); i < 10; i++ { + userKey := []byte{i} + value, err := store.Get(ctx, keyGroup, key, namespace, userKey) + assert.NoError(t, err) + assert.Equal(t, []byte{0x10 + i}, value) + } + + // Test DeleteAll + err = store.DeleteAll(ctx, keyGroup, key, namespace) + assert.NoError(t, err) + + // Verify all keys are deleted + for i := byte(0); i < 10; i++ { + userKey := []byte{i} + _, err := store.Get(ctx, keyGroup, key, namespace, userKey) + assert.ErrorIs(t, err, api.ErrNotFound) + } + + // Verify keys with different prefix are not deleted + otherKeyGroup := []byte{0x99} + err = store.Put(ctx, otherKeyGroup, key, namespace, []byte{0x01}, []byte("other")) + require.NoError(t, err) + + err = store.DeleteAll(ctx, keyGroup, key, namespace) + assert.NoError(t, err) + + value, err := store.Get(ctx, otherKeyGroup, key, namespace, []byte{0x01}) + assert.NoError(t, err) + assert.Equal(t, []byte("other"), value) +} + +func TestRocksDBStateStore_DeleteAll_All0xFF(t *testing.T) { + factory, cleanup := setupRocksDBTest(t) + defer cleanup() + + ctx := context.Background() + store, err := factory.NewStateStore(nil) + require.NoError(t, err) + defer store.Close() + + // Create prefix that is all 0xFF + keyGroup := []byte{0xFF, 0xFF} + key := []byte{0xFF} + namespace := []byte{} + + // Insert multiple keys + for i := byte(0); i < 5; i++ { + userKey := []byte{i} + value := []byte{0x10 + i} + err := store.Put(ctx, keyGroup, key, namespace, userKey, value) + require.NoError(t, err) + } + + // Test DeleteAll with all-0xFF prefix + err = store.DeleteAll(ctx, keyGroup, key, namespace) + assert.NoError(t, err) + + // Verify all keys are deleted + for i := byte(0); i < 5; i++ { + userKey := []byte{i} + _, err := store.Get(ctx, keyGroup, key, namespace, userKey) + assert.ErrorIs(t, err, api.ErrNotFound) + } +} + +func TestRocksDBStateStore_Merge(t *testing.T) { + factory, cleanup := setupRocksDBTest(t) + defer cleanup() + + ctx := context.Background() + store, err := factory.NewStateStore(nil) + require.NoError(t, err) + defer store.Close() + + keyGroup := []byte{0x01} + key := []byte{0x02} + namespace := []byte{0x03} + userKey := []byte{0x04} + + // Test Merge (without MergeOperator, this should behave like Put) + value1 := []byte("value1") + err = store.Merge(ctx, keyGroup, key, namespace, userKey, value1) + assert.NoError(t, err) + + retrieved, err := store.Get(ctx, keyGroup, key, namespace, userKey) + assert.NoError(t, err) + // Without MergeOperator, Merge behaves like Put + assert.Equal(t, value1, retrieved) + + // Test Merge again (should replace) + value2 := []byte("value2") + err = store.Merge(ctx, keyGroup, key, namespace, userKey, value2) + assert.NoError(t, err) + + retrieved, err = store.Get(ctx, keyGroup, key, namespace, userKey) + assert.NoError(t, err) + assert.Equal(t, append(value1, value2...), retrieved) +} + +func TestRocksDBStateStore_ColumnFamilyIsolation(t *testing.T) { + factory, cleanup := setupRocksDBTest(t) + defer cleanup() + + ctx := context.Background() + + // Create two stores with different column families + func1 := &model.Function{ + Name: "func1", + State: config.ConfigMap{"column-family": "cf1"}, + } + store1, err := factory.NewStateStore(func1) + require.NoError(t, err) + defer store1.Close() + + func2 := &model.Function{ + Name: "func2", + State: config.ConfigMap{"column-family": "cf2"}, + } + store2, err := factory.NewStateStore(func2) + require.NoError(t, err) + defer store2.Close() + + // Store same key in both column families + key := "same_key" + value1 := []byte("value1") + value2 := []byte("value2") + + err = store1.PutState(ctx, key, value1) + assert.NoError(t, err) + + err = store2.PutState(ctx, key, value2) + assert.NoError(t, err) + + // Verify isolation + retrieved1, err := store1.GetState(ctx, key) + assert.NoError(t, err) + assert.Equal(t, value1, retrieved1) + + retrieved2, err := store2.GetState(ctx, key) + assert.NoError(t, err) + assert.Equal(t, value2, retrieved2) +} + +func TestRocksDBStateStore_EmptyValues(t *testing.T) { + factory, cleanup := setupRocksDBTest(t) + defer cleanup() + + ctx := context.Background() + store, err := factory.NewStateStore(nil) + require.NoError(t, err) + defer store.Close() + + // Test empty value + err = store.PutState(ctx, "key", []byte{}) + assert.NoError(t, err) + + value, err := store.GetState(ctx, "key") + assert.NoError(t, err) + assert.Equal(t, []byte{}, value) + + // Test empty key parts + err = store.Put(ctx, []byte{}, []byte{}, []byte{}, []byte{}, []byte("value")) + assert.NoError(t, err) + + retrieved, err := store.Get(ctx, []byte{}, []byte{}, []byte{}, []byte{}) + assert.NoError(t, err) + assert.Equal(t, []byte("value"), retrieved) +} + +func TestRocksDBStateStoreFactory_Config(t *testing.T) { + dir, err := os.MkdirTemp("", "rocksdb_config_test_*") + require.NoError(t, err) + defer os.RemoveAll(dir) + + cfgMap := config.ConfigMap{ + "dir_name": dir, + "max-open-files": 1000, + "write-buffer-size": uint64(1024 * 1024), + "max-write-buffer-number": 4, + "target-file-size-base": uint64(64 * 1024 * 1024), + "max-bytes-for-level-base": uint64(256 * 1024 * 1024), + "compression": "snappy", + } + + factory, err := statestore.NewRocksDBStateStoreFactory(cfgMap) + if !assert.NoError(t, err) { + return // If creation fails, return directly to avoid nil pointer + } + assert.NotNil(t, factory) + defer factory.Close() + + // Verify factory creates stores correctly + store, err := factory.NewStateStore(nil) + assert.NoError(t, err) + assert.NotNil(t, store) + + err = store.Close() + assert.NoError(t, err) +} + +func TestRocksDBStateStoreFactory_WithFunction(t *testing.T) { + factory, cleanup := setupRocksDBTest(t) + defer cleanup() + + ctx := context.Background() + + // Create function with column family config + func1 := &model.Function{ + Name: "test_func", + State: config.ConfigMap{"column-family": "test_cf"}, + } + + store, err := factory.NewStateStore(func1) + require.NoError(t, err) + defer store.Close() + + // Test operations + err = store.PutState(ctx, "key", []byte("value")) + assert.NoError(t, err) + + value, err := store.GetState(ctx, "key") + assert.NoError(t, err) + assert.Equal(t, []byte("value"), value) +} + +func TestRocksDBStateStoreFactory_DefaultColumnFamily(t *testing.T) { + factory, cleanup := setupRocksDBTest(t) + defer cleanup() + + // Create function without column family config, should use function name + func1 := &model.Function{ + Name: "default_func", + State: config.ConfigMap{}, + } + + store, err := factory.NewStateStore(func1) + require.NoError(t, err) + defer store.Close() + + ctx := context.Background() + err = store.PutState(ctx, "key", []byte("value")) + assert.NoError(t, err) + + value, err := store.GetState(ctx, "key") + assert.NoError(t, err) + assert.Equal(t, []byte("value"), value) +} + +func TestRocksDBStateStoreFactory_NilFunction(t *testing.T) { + factory, cleanup := setupRocksDBTest(t) + defer cleanup() + + store, err := factory.NewStateStore(nil) + require.NoError(t, err) + require.NotNil(t, store) + defer store.Close() + + ctx := context.Background() + err = store.PutState(ctx, "key", []byte("value")) + assert.NoError(t, err) + + value, err := store.GetState(ctx, "key") + assert.NoError(t, err) + assert.Equal(t, []byte("value"), value) +} + +func TestRocksDBStateStore_Close(t *testing.T) { + factory, cleanup := setupRocksDBTest(t) + defer cleanup() + + func1 := &model.Function{ + Name: "func1", + State: config.ConfigMap{"column-family": "cf1"}, + } + + store, err := factory.NewStateStore(func1) + require.NoError(t, err) + + ctx := context.Background() + err = store.PutState(ctx, "key", []byte("value")) + assert.NoError(t, err) + + // Close should not error + err = store.Close() + assert.NoError(t, err) + + // Operations after close should fail or behave unpredictably + // (depends on implementation, but close should succeed) +} + +func TestRocksDBStateStore_Iterator(t *testing.T) { + factory, cleanup := setupRocksDBTest(t) + defer cleanup() + + ctx := context.Background() + store, err := factory.NewStateStore(nil) + require.NoError(t, err) + defer store.Close() + + // Insert multiple keys - use larger data set + keyGroup := []byte{0x01} + key := []byte{0x02} + namespace := []byte{0x03} + + numKeys := 100 + for i := 0; i < numKeys; i++ { + userKey := make([]byte, 2) + userKey[0] = byte(i / 256) + userKey[1] = byte(i % 256) + value := make([]byte, 64) + for j := 0; j < 64; j++ { + value[j] = byte(i*64 + j) + } + err := store.Put(ctx, keyGroup, key, namespace, userKey, value) + require.NoError(t, err) + } + + // Create iterator with prefix + prefix := buildKeyForRocksDBTest(keyGroup, key, namespace, []byte{}) + iterID, err := store.NewIterator(prefix) + require.NoError(t, err) + defer store.CloseIterator(iterID) + + // Iterate through all keys + var values [][]byte + hasMore, err := store.HasNext(iterID) + require.NoError(t, err) + for hasMore { + value, err := store.Next(iterID) + require.NoError(t, err) + values = append(values, value) + hasMore, err = store.HasNext(iterID) + require.NoError(t, err) + } + + // Verify we got all values + assert.Len(t, values, numKeys) + + // Verify values are correct + expectedValues := make(map[string]bool) + for i := 0; i < numKeys; i++ { + expectedValue := make([]byte, 64) + for j := 0; j < 64; j++ { + expectedValue[j] = byte(i*64 + j) + } + expectedValues[string(expectedValue)] = true + } + + for _, v := range values { + assert.True(t, expectedValues[string(v)], "Value should be present") + } +} + +func TestRocksDBStateStore_Iterator_Prefix(t *testing.T) { + factory, cleanup := setupRocksDBTest(t) + defer cleanup() + + ctx := context.Background() + store, err := factory.NewStateStore(nil) + require.NoError(t, err) + defer store.Close() + + // Insert keys with different prefixes - use larger data set + keyGroup1 := []byte{0x01} + keyGroup2 := []byte{0x02} + key := []byte{0x03} + namespace := []byte{0x04} + + numKeys1 := 50 + for i := 0; i < numKeys1; i++ { + userKey := make([]byte, 2) + userKey[0] = byte(i / 256) + userKey[1] = byte(i % 256) + value := make([]byte, 32) + for j := 0; j < 32; j++ { + value[j] = byte(0x10 + i) + } + err := store.Put(ctx, keyGroup1, key, namespace, userKey, value) + require.NoError(t, err) + } + + numKeys2 := 30 + for i := 0; i < numKeys2; i++ { + userKey := make([]byte, 2) + userKey[0] = byte(i / 256) + userKey[1] = byte(i % 256) + value := make([]byte, 32) + for j := 0; j < 32; j++ { + value[j] = byte(0x20 + i) + } + err := store.Put(ctx, keyGroup2, key, namespace, userKey, value) + require.NoError(t, err) + } + + // Create iterator with specific prefix + prefix := buildKeyForRocksDBTest(keyGroup1, key, namespace, []byte{}) + iterID, err := store.NewIterator(prefix) + require.NoError(t, err) + defer store.CloseIterator(iterID) + + // Iterate through keys with first prefix + var values [][]byte + hasMore, err := store.HasNext(iterID) + require.NoError(t, err) + for hasMore { + value, err := store.Next(iterID) + require.NoError(t, err) + values = append(values, value) + hasMore, err = store.HasNext(iterID) + require.NoError(t, err) + } + + // Verify we got only values from first prefix + assert.Len(t, values, numKeys1) + + // Verify values are from keyGroup1 + expectedValues := make(map[string]bool) + for i := 0; i < numKeys1; i++ { + expectedValue := make([]byte, 32) + for j := 0; j < 32; j++ { + expectedValue[j] = byte(0x10 + i) + } + expectedValues[string(expectedValue)] = true + } + + for _, v := range values { + assert.True(t, expectedValues[string(v)], "Value should be present") + } +} + +func TestRocksDBStateStore_Iterator_EmptyPrefix(t *testing.T) { + factory, cleanup := setupRocksDBTest(t) + defer cleanup() + + ctx := context.Background() + store, err := factory.NewStateStore(nil) + require.NoError(t, err) + defer store.Close() + + // Insert multiple keys - use larger data set + numKeys := 80 + for i := 0; i < numKeys; i++ { + key := fmt.Sprintf("key%03d", i) + value := make([]byte, 128) + for j := 0; j < 128; j++ { + value[j] = byte(i*128 + j) + } + err := store.PutState(ctx, key, value) + require.NoError(t, err) + } + + // Create iterator with empty prefix (iterate all) + iterID, err := store.NewIterator([]byte{}) + require.NoError(t, err) + defer store.CloseIterator(iterID) + + // Iterate through all keys + count := 0 + hasMore, err := store.HasNext(iterID) + require.NoError(t, err) + for hasMore { + _, err := store.Next(iterID) + require.NoError(t, err) + count++ + hasMore, err = store.HasNext(iterID) + require.NoError(t, err) + } + + // Verify we got at least the expected number of values + assert.GreaterOrEqual(t, count, numKeys) +} + +func TestRocksDBStateStore_Iterator_ColumnFamily(t *testing.T) { + factory, cleanup := setupRocksDBTest(t) + defer cleanup() + + ctx := context.Background() + + // Create function with column family + func1 := &model.Function{ + Name: "test_func", + State: config.ConfigMap{"column-family": "test_cf"}, + } + + store, err := factory.NewStateStore(func1) + require.NoError(t, err) + defer store.Close() + + // Insert multiple keys - use larger data set + keyGroup := []byte{0x01} + key := []byte{0x02} + namespace := []byte{0x03} + + numKeys := 60 + for i := 0; i < numKeys; i++ { + userKey := make([]byte, 2) + userKey[0] = byte(i / 256) + userKey[1] = byte(i % 256) + value := make([]byte, 96) + for j := 0; j < 96; j++ { + value[j] = byte(0x10 + i + j) + } + err := store.Put(ctx, keyGroup, key, namespace, userKey, value) + require.NoError(t, err) + } + + // Create iterator + prefix := buildKeyForRocksDBTest(keyGroup, key, namespace, []byte{}) + iterID, err := store.NewIterator(prefix) + require.NoError(t, err) + defer store.CloseIterator(iterID) + + // Iterate through all keys + var values [][]byte + hasMore, err := store.HasNext(iterID) + require.NoError(t, err) + for hasMore { + value, err := store.Next(iterID) + require.NoError(t, err) + values = append(values, value) + hasMore, err = store.HasNext(iterID) + require.NoError(t, err) + } + + // Verify we got all values + assert.Len(t, values, numKeys) +} + +// Helper function for RocksDB tests +func buildKeyForRocksDBTest(keyGroup, key, namespace, userKey []byte) []byte { + result := make([]byte, 0, len(keyGroup)+len(key)+len(namespace)+len(userKey)) + result = append(result, keyGroup...) + result = append(result, key...) + result = append(result, namespace...) + result = append(result, userKey...) + return result +} diff --git a/go.mod b/go.mod index 8fe63292..cc11a114 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/go-logr/zapr v1.3.0 github.com/go-openapi/spec v0.21.0 github.com/go-playground/validator/v10 v10.11.1 + github.com/linxGnu/grocksdb v1.7.16 github.com/nats-io/nats.go v1.37.0 github.com/pkg/errors v0.9.1 github.com/spf13/cobra v1.8.0 diff --git a/go.sum b/go.sum index b0601130..02df72e7 100644 --- a/go.sum +++ b/go.sum @@ -136,6 +136,8 @@ github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w= github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= github.com/linkedin/goavro/v2 v2.12.0 h1:rIQQSj8jdAUlKQh6DttK8wCRv4t4QO09g1C4aBWXslg= github.com/linkedin/goavro/v2 v2.12.0/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk= +github.com/linxGnu/grocksdb v1.7.16 h1:Q2co1xrpdkr5Hx3Fp+f+f7fRGhQFQhvi/+226dtLmA8= +github.com/linxGnu/grocksdb v1.7.16/go.mod h1:JkS7pl5qWpGpuVb3bPqTz8nC12X3YtPZT+Xq7+QfQo4= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= diff --git a/scripts/build-rocksdb.sh b/scripts/build-rocksdb.sh new file mode 100755 index 00000000..9c0787b1 --- /dev/null +++ b/scripts/build-rocksdb.sh @@ -0,0 +1,590 @@ +#!/bin/bash +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -e + +# Script directory (relative to project root) +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +# Place lib directory under bin directory to be consistent with other build artifacts +LIB_DIR="$PROJECT_ROOT/bin/function-stream/lib" +ROCKSDB_DIR="$LIB_DIR/rocksdb" +ROCKSDB_SRC_DIR="$ROCKSDB_DIR/src" +# RocksDB version compatible with grocksdb v1.7.16 +# According to testing, grocksdb v1.7.16 requires RocksDB 7.x version +# 7.10.2 is a stable 7.x version compatible with grocksdb v1.7.16 +ROCKSDB_VERSION="${ROCKSDB_VERSION:-7.10.2}" + +# Timing related variables +START_TIME=0 +STEP_START_TIME=0 + +echo_info() { + echo "[INFO] $1" +} + +echo_warn() { + echo "[WARN] $1" +} + +echo_error() { + echo "[ERROR] $1" +} + +echo_time() { + echo "[TIME] $1" +} + +# Start timer +start_timer() { + START_TIME=$(date +%s) + STEP_START_TIME=$START_TIME + echo_time "Total timing started: $(date '+%Y-%m-%d %H:%M:%S')" +} + +# Start step timing +step_start() { + STEP_START_TIME=$(date +%s) +} + +# Calculate and display duration +step_duration() { + local step_name="$1" + local end_time=$(date +%s) + local duration=$((end_time - STEP_START_TIME)) + local minutes=$((duration / 60)) + local seconds=$((duration % 60)) + + if [ $minutes -gt 0 ]; then + echo_time "${step_name} took: ${minutes}m${seconds}s (${duration}s)" + else + echo_time "${step_name} took: ${seconds}s" + fi + STEP_START_TIME=$end_time +} + +# Display total duration +total_duration() { + local end_time=$(date +%s) + local total_duration=$((end_time - START_TIME)) + local minutes=$((total_duration / 60)) + local seconds=$((total_duration % 60)) + + echo "" + echo_time "==========================================" + if [ $minutes -gt 0 ]; then + echo_time "Total time: ${minutes}m${seconds}s (${total_duration}s)" + else + echo_time "Total time: ${seconds}s" + fi + echo_time "End time: $(date '+%Y-%m-%d %H:%M:%S')" + echo_time "==========================================" +} + +# Detect operating system +detect_os() { + if [[ "$OSTYPE" == "darwin"* ]]; then + echo "darwin" + elif [[ "$OSTYPE" == "linux-gnu"* ]] || [[ "$OSTYPE" == "linux-musl"* ]]; then + echo "linux" + elif [[ -n "$WSL_DISTRO_NAME" ]] || grep -qEi "(Microsoft|WSL)" /proc/version 2>/dev/null; then + echo "linux" # WSL is treated as Linux + else + echo "unknown" + fi +} + +OS=$(detect_os) + +# Set library file name +STATIC_LIB_NAME="librocksdb.a" + +# Check dependencies +check_dependencies() { + local missing_deps=() + + if ! command -v git >/dev/null 2>&1; then + missing_deps+=("git") + fi + + # Linux/macOS need make + if ! command -v make >/dev/null 2>&1; then + missing_deps+=("make") + fi + + # Check compiler + if [[ "$OS" == "darwin"* ]]; then + if ! command -v clang++ >/dev/null 2>&1 && ! command -v g++ >/dev/null 2>&1; then + missing_deps+=("clang++ or g++") + fi + else + if ! command -v g++ >/dev/null 2>&1; then + missing_deps+=("g++") + fi + fi + + if [ ${#missing_deps[@]} -ne 0 ]; then + echo_error "Missing dependencies: ${missing_deps[*]}" + echo "" + if [[ "$OS" == "darwin"* ]]; then + echo "macOS installation guide:" + echo " Install Xcode Command Line Tools:" + echo " xcode-select --install" + else + echo "Linux installation guide:" + echo " Ubuntu/Debian: sudo apt-get install build-essential git" + echo " Fedora/RHEL: sudo dnf install gcc-c++ make git" + fi + exit 1 + fi +} + +# Download prebuilt RocksDB binaries (if available) +download_prebuilt_rocksdb() { + step_start + echo_info "Attempting to download prebuilt RocksDB binaries..." + + # Detect platform architecture + local arch="" + local lib_name="" + if [[ "$OS" == "darwin"* ]]; then + if [[ $(uname -m) == "arm64" ]]; then + arch="darwin-arm64" + lib_name="librocksdb.a" + else + arch="darwin-amd64" + lib_name="librocksdb.a" + fi + elif [[ "$OS" == "linux"* ]]; then + if [[ $(uname -m) == "x86_64" ]]; then + arch="linux-amd64" + lib_name="librocksdb.a" + elif [[ $(uname -m) == "aarch64" ]]; then + arch="linux-arm64" + lib_name="librocksdb.a" + else + return 1 + fi + else + return 1 + fi + + echo_info "Detected platform: $arch" + + # Prioritize checking if custom prebuilt source is configured + if [ -n "$ROCKSDB_PREBUILT_BASE_URL" ]; then + local prebuilt_url="${ROCKSDB_PREBUILT_BASE_URL}/rocksdb-${ROCKSDB_VERSION}-${arch}.tar.gz" + echo_info "Attempting to download from custom source: $prebuilt_url" + + if command -v curl >/dev/null 2>&1; then + if curl -L -f -o "rocksdb-${ROCKSDB_VERSION}-${arch}.tar.gz" "$prebuilt_url" 2>/dev/null; then + echo_info "✅ Successfully downloaded prebuilt version" + mkdir -p "$ROCKSDB_DIR/lib" "$ROCKSDB_DIR/include" + tar -xzf "rocksdb-${ROCKSDB_VERSION}-${arch}.tar.gz" -C "$ROCKSDB_DIR" 2>/dev/null || { + # If tar extraction fails, try extracting directly to current directory + tar -xzf "rocksdb-${ROCKSDB_VERSION}-${arch}.tar.gz" + if [ -f "lib/$lib_name" ] && [ -d "include" ]; then + mkdir -p "$ROCKSDB_DIR/lib" "$ROCKSDB_DIR/include" + cp "lib/$lib_name" "$ROCKSDB_DIR/lib/" + cp -r include/* "$ROCKSDB_DIR/include/" + fi + } + rm -f "rocksdb-${ROCKSDB_VERSION}-${arch}.tar.gz" + + # Verify if files were downloaded successfully + if [ -f "$ROCKSDB_DIR/lib/$lib_name" ] && [ -d "$ROCKSDB_DIR/include" ]; then + step_duration "Download prebuilt version" + echo_info "✅ Prebuilt RocksDB binaries are ready" + return 0 + fi + fi + fi + fi + + # Try GitHub Releases (example, users need to build and upload themselves) + # A standard GitHub Releases URL pattern can be added here + # For example: https://github.com/your-org/rocksdb-prebuilt/releases/download/v${ROCKSDB_VERSION}/rocksdb-${arch}.tar.gz + + # Currently RocksDB official doesn't provide prebuilt versions, so return failure and fall back to source compilation + echo_warn "Prebuilt RocksDB binaries not found, will use source compilation" + echo_info "Hint: You can specify a prebuilt source by setting environment variable ROCKSDB_PREBUILT_BASE_URL" + step_duration "Check prebuilt version" + return 1 +} + +# Download RocksDB source code (if prebuilt is not available) +download_rocksdb() { + step_start + mkdir -p "$ROCKSDB_DIR" + cd "$ROCKSDB_DIR" + TARBALL_NAME="rocksdb-${ROCKSDB_VERSION}.tar.gz" + ROCKSDB_URL="https://github.com/facebook/rocksdb/archive/refs/tags/v${ROCKSDB_VERSION}.tar.gz" + + # Check if source directory already exists and is not empty + if [ -d "$ROCKSDB_SRC_DIR" ] && [ "$(ls -A "$ROCKSDB_SRC_DIR" 2>/dev/null)" ]; then + echo_info "RocksDB source code already exists, skipping download and extraction" + echo_info "Source directory: $ROCKSDB_SRC_DIR" + step_duration "Check source code" + return 0 + fi + + # Check if tarball already exists + if [ -f "$TARBALL_NAME" ]; then + echo_info "Found existing tarball: $TARBALL_NAME, skipping download" + step_duration "Check tarball" + # If tarball exists but source directory doesn't, continue with extraction logic + else + # Tarball doesn't exist, need to download + echo_info "Downloading RocksDB v${ROCKSDB_VERSION} source code..." + # Download tarball + echo_info "Downloading from ${ROCKSDB_URL}..." + if command -v curl >/dev/null 2>&1; then + if curl -L -f -o "$TARBALL_NAME" "$ROCKSDB_URL" 2>/dev/null; then + echo_info "Successfully downloaded RocksDB v${ROCKSDB_VERSION}" + else + echo_error "Download failed: ${ROCKSDB_URL}" + echo_error "Please check network connection or manually download tarball to: $ROCKSDB_DIR/$TARBALL_NAME" + exit 1 + fi + elif command -v wget >/dev/null 2>&1; then + if wget -O "$TARBALL_NAME" "$ROCKSDB_URL" 2>/dev/null; then + echo_info "Successfully downloaded RocksDB v${ROCKSDB_VERSION}" + else + echo_error "Download failed: ${ROCKSDB_URL}" + echo_error "Please check network connection or manually download tarball to: $ROCKSDB_DIR/$TARBALL_NAME" + exit 1 + fi + else + echo_error "curl or wget not found, cannot download" + echo_error "Please manually download ${ROCKSDB_URL} to $ROCKSDB_DIR/$TARBALL_NAME" + exit 1 + fi + step_duration "Download tarball" + fi + + # If directory exists but is empty, clean it (before extraction) + if [ -d "$ROCKSDB_SRC_DIR" ] && [ -z "$(ls -A "$ROCKSDB_SRC_DIR" 2>/dev/null)" ]; then + echo_warn "Source directory exists but is empty, cleaning..." + rm -rf "$ROCKSDB_SRC_DIR" + fi + + # Check if already extracted (source directory exists) + if [ -d "src" ] && [ "$(ls -A src 2>/dev/null)" ]; then + echo_info "Source directory already exists, skipping extraction" + echo_info "Source directory: $ROCKSDB_SRC_DIR" + step_duration "Check extraction status" + else + # Try extracting tarball + echo_info "Starting extraction of $TARBALL_NAME..." + if tar -xzf "$TARBALL_NAME" 2>/dev/null; then + if [ -d "rocksdb-${ROCKSDB_VERSION}" ]; then + mv "rocksdb-${ROCKSDB_VERSION}" src + echo_info "Successfully extracted RocksDB v${ROCKSDB_VERSION}" + # Keep tarball after successful extraction (for next time use) + echo_info "Tarball preserved: $TARBALL_NAME (can be used directly next time)" + step_duration "Extract tarball" + else + echo_error "Expected directory not found after extraction: rocksdb-${ROCKSDB_VERSION}" + echo_error "Tarball may be corrupted, deleted: $TARBALL_NAME" + rm -f "$TARBALL_NAME" + exit 1 + fi + else + echo_error "Extraction failed, tarball may be corrupted" + # Only delete corrupted tarball when extraction fails + rm -f "$TARBALL_NAME" + echo_error "Deleted corrupted tarball: $TARBALL_NAME" + exit 1 + fi + fi + + step_duration "Download source code" + echo_info "RocksDB download completed" +} + +# Build RocksDB (Linux/macOS) +build_rocksdb_unix() { + step_start + echo_info "Starting to build RocksDB (Unix systems)..." + cd "$ROCKSDB_SRC_DIR" + + # Detect CPU core count + if [[ "$OS" == "darwin"* ]]; then + CPU_COUNT=$(sysctl -n hw.ncpu 2>/dev/null || echo 4) + else + CPU_COUNT=$(nproc 2>/dev/null || echo 4) + fi + + echo_info "Using ${CPU_COUNT} CPU cores for compilation" + + + # Build static library with minimal features for size optimization + # Enable ROCKSDB_LITE mode - removes non-core features to minimize size + # Disable optional features to reduce library size: + # - ROCKSDB_LITE=1: Enable lite mode (removes advanced features) + # - WITH_JEMALLOC=0: Disable jemalloc (use system malloc) + # - WITH_MD_LIBRARY=0: Disable MD library + # - WITH_NUMA=0: Disable NUMA support + # - WITH_TBB=0: Disable Intel TBB + # - ROCKSDB_BUILD_SHARED=0: Don't build shared libraries + # - WITH_TOOLS=0: Don't build tools + # - WITH_EXAMPLES=0: Don't build examples + # - WITH_TESTS=0: Don't build tests + # Disable all compression algorithms to minimize size (only "none" compression available): + # - USE_SNAPPY=0: Disable Snappy compression + # - USE_LZ4=0: Disable LZ4 compression + # - USE_ZSTD=0: Disable ZSTD compression + # - USE_ZLIB=0: Disable Zlib compression + # - USE_BZIP2=0: Disable BZip2 compression + # - OPT="-Os": Optimize for size + echo_info "Executing: make -j\"${CPU_COUNT}\" ROCKSDB_LITE=1 PORTABLE=1 USE_RTTI=1 DEBUG_LEVEL=0 DISABLE_WARNING_AS_ERROR=1 OPT=\"-Os\" WITH_JEMALLOC=0 WITH_MD_LIBRARY=0 WITH_NUMA=0 WITH_TBB=0 ROCKSDB_BUILD_SHARED=0 WITH_TOOLS=0 WITH_EXAMPLES=0 WITH_TESTS=0 USE_SNAPPY=0 USE_LZ4=0 USE_ZSTD=0 USE_ZLIB=0 USE_BZIP2=0 static_lib" + + if ! make -j"${CPU_COUNT}" ROCKSDB_LITE=1 PORTABLE=1 USE_RTTI=1 DEBUG_LEVEL=0 DISABLE_WARNING_AS_ERROR=1 OPT="-Os" WITH_JEMALLOC=0 WITH_MD_LIBRARY=0 WITH_NUMA=0 WITH_TBB=0 ROCKSDB_BUILD_SHARED=0 WITH_TOOLS=0 WITH_EXAMPLES=0 WITH_TESTS=0 USE_SNAPPY=0 USE_LZ4=0 USE_ZSTD=0 USE_ZLIB=0 USE_BZIP2=0 static_lib; then + echo_error "❌ Build failed" + echo_error "Debug information:" + echo_error " Working directory: $(pwd)" + echo_error " make_config.mk exists: $([ -f make_config.mk ] && echo 'yes' || echo 'no')" + if [ -f "make_config.mk" ]; then + echo_error " make_config.mk content preview (first 30 lines):" + head -30 make_config.mk | sed 's/^/ /' + fi + echo_error " Compression library detection (in make_config.mk):" + grep -E "SNAPPY|LZ4|ZSTD|snappy|lz4|zstd|-DSNAPPY|-DLZ4|-DZSTD" make_config.mk 2>/dev/null | head -10 | sed 's/^/ /' || echo " Not found" + exit 1 + fi + + # Note: ROCKSDB_LITE mode enabled - minimal features only + if [ -f "$STATIC_LIB_NAME" ]; then + echo_info "✅ Library file compiled successfully (ROCKSDB_LITE mode - minimal build)" + echo_info "Note: ROCKSDB_LITE mode enabled - only core features available" + echo_info "Note: Only 'none' compression type is available. Compression algorithms are disabled to reduce size." + fi + + step_duration "Build source code" + + step_start + # Create output directories + mkdir -p "$ROCKSDB_DIR/lib" "$ROCKSDB_DIR/include" + + # Copy library file and strip debug symbols to reduce size + if [ -f "$STATIC_LIB_NAME" ]; then + cp "$STATIC_LIB_NAME" "$ROCKSDB_DIR/lib/" + # Strip debug symbols to reduce library size + if command -v strip >/dev/null 2>&1; then + echo_info "Stripping debug symbols to reduce library size..." + strip "$ROCKSDB_DIR/lib/$STATIC_LIB_NAME" 2>/dev/null || { + echo_warn "⚠️ Failed to strip library, continuing anyway" + } + fi + echo_info "Copied static library to $ROCKSDB_DIR/lib/$STATIC_LIB_NAME" + else + echo_error "Build failed: $STATIC_LIB_NAME not found" + exit 1 + fi + + # Copy header files + if [ -d "include" ]; then + cp -r include/* "$ROCKSDB_DIR/include/" + echo_info "Copied header files to $ROCKSDB_DIR/include" + else + echo_error "Build failed: include directory not found" + exit 1 + fi + + step_duration "Package files" + + # Check if library files were successfully generated, only clean source code after confirming binaries are generated + if [ -f "$ROCKSDB_DIR/lib/$STATIC_LIB_NAME" ] && [ -d "$ROCKSDB_DIR/include" ]; then + step_start + echo_info "✅ Binaries successfully generated, cleaning source code and build files..." + # Clean source directory and build intermediate files (preserve copied library and header files) + cd "$ROCKSDB_SRC_DIR" + # Clean compilation intermediate files (optional, saves space) + if [ -f "Makefile" ]; then + make clean > /dev/null 2>&1 || true + fi + # Clean entire source directory + cd "$ROCKSDB_DIR" + if [ -d "src" ]; then + rm -rf "src" + echo_info "Source directory cleaned" + fi + step_duration "Clean source code" + else + echo_warn "⚠️ Library files not successfully generated, keeping source directory for debugging" + echo_warn "Expected library file location: $ROCKSDB_DIR/lib/$STATIC_LIB_NAME" + echo_warn "Expected header file location: $ROCKSDB_DIR/include" + fi + + echo_info "RocksDB build completed" + echo_info "Library file location: $ROCKSDB_DIR/lib/$STATIC_LIB_NAME" + echo_info "Header file location: $ROCKSDB_DIR/include" +} + +# Build RocksDB +build_rocksdb() { + # Check if already built (binary files exist) + # If binary files already exist, skip compilation and don't clean source code + if [ -f "$ROCKSDB_DIR/lib/$STATIC_LIB_NAME" ] && [ -d "$ROCKSDB_DIR/include" ]; then + echo_info "✅ RocksDB binaries already exist, skipping build step" + echo_info "Library file: $ROCKSDB_DIR/lib/$STATIC_LIB_NAME" + echo_info "Header files: $ROCKSDB_DIR/include" + if [ -d "$ROCKSDB_SRC_DIR" ]; then + echo_info "Source directory preserved: $ROCKSDB_SRC_DIR (not cleaned because binaries exist)" + fi + echo_info "To rebuild, please delete $ROCKSDB_DIR/lib directory" + return 0 + fi + + # Try to download prebuilt version first + if download_prebuilt_rocksdb; then + echo_info "✅ Successfully using prebuilt RocksDB binaries" + return 0 + fi + + # If no prebuilt version, compile from source + echo_info "Starting to build RocksDB from source..." + + # Ensure source code is downloaded (skip download if already exists) + download_rocksdb + + # Check if source directory exists + if [ ! -d "$ROCKSDB_SRC_DIR" ] || [ -z "$(ls -A "$ROCKSDB_SRC_DIR" 2>/dev/null)" ]; then + echo_error "RocksDB source directory does not exist or is empty: $ROCKSDB_SRC_DIR" + echo_error "Please check the download process or manually download the source code" + exit 1 + fi + + build_rocksdb_unix +} + +# Clean (complete cleanup, including tarball) +clean() { + if [ -d "$ROCKSDB_DIR" ]; then + echo_info "Cleaning RocksDB build files (including tarball)..." + rm -rf "$ROCKSDB_DIR" + echo_info "Cleanup completed" + echo_info "Cleaned: binary library files, header file directory, source directory, tarball" + else + echo_info "RocksDB build files do not exist, no cleanup needed" + fi +} + +# Clean intermediate files (keep binaries, clean source code and tarball) +# For daily development use - keeps include directory for future builds +clean_intermediate() { + TARBALL_NAME="rocksdb-${ROCKSDB_VERSION}.tar.gz" + TARBALL_PATH="$ROCKSDB_DIR/$TARBALL_NAME" + + if [ ! -d "$ROCKSDB_DIR" ] && [ ! -d "$ROCKSDB_DIR/src" ] && [ ! -f "$TARBALL_PATH" ]; then + echo_info "RocksDB intermediate files do not exist, no cleanup needed" + return 0 + fi + + echo_info "Cleaning RocksDB intermediate files (keeping binaries and headers for development)..." + + # Clean tarball + if [ -f "$TARBALL_PATH" ]; then + echo_info "Cleaning tarball: $TARBALL_NAME" + rm -f "$TARBALL_PATH" + fi + + # Clean source directory + if [ -d "$ROCKSDB_DIR/src" ]; then + echo_info "Cleaning source directory: $ROCKSDB_DIR/src" + rm -rf "$ROCKSDB_DIR/src" + fi + + echo_info "Intermediate files cleanup completed" + echo_info "Cleaned: source directory, tarball" + echo_info "Preserved: binary library file (librocksdb.a), header file directory (for development)" +} + +# Clean for packaging (also clean include directory) +# Use this when building final binary package - removes include as it's not needed in package +clean_for_package() { + TARBALL_NAME="rocksdb-${ROCKSDB_VERSION}.tar.gz" + TARBALL_PATH="$ROCKSDB_DIR/$TARBALL_NAME" + + echo_info "Cleaning RocksDB files for packaging..." + + # Clean tarball + if [ -f "$TARBALL_PATH" ]; then + echo_info "Cleaning tarball: $TARBALL_NAME" + rm -f "$TARBALL_PATH" + fi + + # Clean source directory + if [ -d "$ROCKSDB_DIR/src" ]; then + echo_info "Cleaning source directory: $ROCKSDB_DIR/src" + rm -rf "$ROCKSDB_DIR/src" + fi + + # Clean include directory (not needed in final package) + if [ -d "$ROCKSDB_DIR/include" ]; then + echo_info "Cleaning include directory: $ROCKSDB_DIR/include" + rm -rf "$ROCKSDB_DIR/include" + fi + + echo_info "Package cleanup completed" + echo_info "Cleaned: source directory, tarball, include directory" + echo_info "Preserved: binary library file (librocksdb.a)" +} + +# Main function +main() { + cd "$PROJECT_ROOT" + + echo_info "Detected operating system: $OS" + + case "${1:-build}" in + build) + start_timer + + step_start + echo_info "Detected operating system: $OS" + step_duration "Initialization" + + step_start + check_dependencies + step_duration "Check dependencies" + + build_rocksdb + + echo_info "✅ RocksDB build completed!" + echo_info "Library file: $ROCKSDB_DIR/lib/$STATIC_LIB_NAME" + echo_info "Header files: $ROCKSDB_DIR/include" + + total_duration + ;; + clean) + clean + ;; + clean-intermediate) + clean_intermediate + ;; + clean-for-package) + clean_for_package + ;; + *) + echo "Usage: $0 [build|clean|clean-intermediate|clean-for-package]" + echo " build: Download and build RocksDB (default)" + echo " clean: Clean downloaded and compiled files (including tarball)" + echo " clean-intermediate: Clean intermediate files (keep binaries and headers for development)" + echo " clean-for-package: Clean for packaging (remove include directory, keep only binaries)" + exit 1 + ;; + esac +} + +main "$@" diff --git a/scripts/setup-rocksdb-deps.sh b/scripts/setup-rocksdb-deps.sh new file mode 100755 index 00000000..697caac8 --- /dev/null +++ b/scripts/setup-rocksdb-deps.sh @@ -0,0 +1,266 @@ +#!/bin/bash +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -e + +echo_info() { + echo "[INFO] $1" +} + +echo_warn() { + echo "[WARN] $1" +} + +echo_error() { + echo "[ERROR] $1" +} + +# Detect operating system +detect_os() { + case "$(uname -s)" in + Darwin*) + echo "darwin" + ;; + Linux*) + echo "linux" + ;; + *) + echo "unknown" + ;; + esac +} + +# Detect CPU architecture +detect_arch() { + local arch=$(uname -m) + case "$arch" in + x86_64|amd64) + echo "x86_64" + ;; + arm64|aarch64) + echo "arm64" + ;; + armv7*) + echo "armv7" + ;; + arm*) + echo "arm" + ;; + *) + echo "$arch" + ;; + esac +} + +# Detect Homebrew path (based on architecture) +detect_homebrew_prefix() { + if [ -d "/opt/homebrew" ]; then + # Apple Silicon (arm64) + echo "/opt/homebrew" + elif [ -d "/usr/local" ]; then + # Intel Mac (x86_64) or others + echo "/usr/local" + else + echo "" + fi +} + +# Detect Linux library path (based on architecture) +detect_linux_lib_dir() { + local arch=$(detect_arch) + case "$arch" in + x86_64) + if [ -d "/usr/lib/x86_64-linux-gnu" ]; then + echo "/usr/lib/x86_64-linux-gnu" + elif [ -d "/usr/lib64" ]; then + echo "/usr/lib64" + else + echo "/usr/lib" + fi + ;; + arm64|aarch64) + if [ -d "/usr/lib/aarch64-linux-gnu" ]; then + echo "/usr/lib/aarch64-linux-gnu" + elif [ -d "/usr/lib64" ]; then + echo "/usr/lib64" + else + echo "/usr/lib" + fi + ;; + armv7|arm) + if [ -d "/usr/lib/arm-linux-gnueabihf" ]; then + echo "/usr/lib/arm-linux-gnueabihf" + elif [ -d "/usr/lib/arm-linux-gnueabi" ]; then + echo "/usr/lib/arm-linux-gnueabi" + else + echo "/usr/lib" + fi + ;; + *) + echo "/usr/lib" + ;; + esac +} + +# Check if library exists +check_library() { + local lib_name=$1 + local lib_path=$2 + + if [ -f "$lib_path" ] || [ -d "$lib_path" ]; then + return 0 + else + return 1 + fi +} + +# Install dependencies on macOS (using Homebrew) +# Note: Compression libraries are disabled, so no dependencies needed for minimal build +install_deps_darwin() { + echo_info "Checking macOS dependencies..." + echo_info "Note: Compression libraries are disabled in minimal build, no dependencies required" + echo_info "✅ No dependencies needed (compression disabled)" + return 0 +} + +# Install dependencies on Linux (using package manager) +# Note: Compression libraries are disabled, so no dependencies needed for minimal build +install_deps_linux() { + echo_info "Checking Linux dependencies..." + echo_info "Note: Compression libraries are disabled in minimal build, no dependencies required" + echo_info "✅ No dependencies needed (compression disabled)" + return 0 +} + +# Find static library file by checking common paths +# Arguments: library name (e.g., "bz2" or "z"), library directory +# Returns: full path to static library if found, empty string otherwise +find_static_library() { + local lib_name=$1 + local lib_dir=$2 + local static_paths=( + "${lib_dir}/lib${lib_name}.a" + "/usr/lib/x86_64-linux-gnu/lib${lib_name}.a" + "/usr/lib64/lib${lib_name}.a" + "/usr/lib/lib${lib_name}.a" + ) + + for path in "${static_paths[@]}"; do + if [ -f "$path" ]; then + echo "$path" + return 0 + fi + done + + return 1 +} + +# Generate CGO_LDFLAGS for linking RocksDB and dependencies +# Note: RocksDB's compressed_secondary_cache requires bzip2 and zlib +# These libraries are statically linked to avoid runtime dependencies +generate_cgo_ldflags() { + local os=$(detect_os) + local rocksdb_lib_dir=$1 + local output_file=$2 + local rocksdb_static="${rocksdb_lib_dir}/librocksdb.a" + + if [ "$os" = "darwin" ]; then + # macOS: Statically link RocksDB, dynamically link system libraries + # Using grocksdb_no_link tag, so we provide all necessary libraries + # macOS system libraries (bz2, z) are available on all macOS systems + if [ -f "$rocksdb_static" ]; then + # Force load static library to ensure all symbols are included + echo "-Wl,-force_load,${rocksdb_static} -pthread -lstdc++ -ldl -lbz2 -lz" > "$output_file" + else + # Fallback: use library path (should not happen if setup-rocksdb completed) + echo "-L${rocksdb_lib_dir} -lrocksdb -pthread -lstdc++ -ldl -lbz2 -lz" > "$output_file" + fi + elif [ "$os" = "linux" ]; then + # Linux: Fully static link all libraries for maximum portability + local lib_dir=$(detect_linux_lib_dir) + local bz2_static + local z_static + + # Try to find static library files + bz2_static=$(find_static_library "bz2" "$lib_dir" 2>/dev/null || echo "") + z_static=$(find_static_library "z" "$lib_dir" 2>/dev/null || echo "") + + # Build link flags based on available static libraries + if [ -n "$bz2_static" ] && [ -n "$z_static" ] && [ -f "$rocksdb_static" ]; then + # Best case: all static libraries found, create fully static binary + echo "${rocksdb_static} ${bz2_static} ${z_static} -static -pthread -lstdc++ -ldl" > "$output_file" + elif [ -f "$rocksdb_static" ]; then + # RocksDB static found, but system libs not found - use -static flag + local lib_search="" + [ -d "$lib_dir" ] && [ "$lib_dir" != "/usr/lib" ] && lib_search="-L${lib_dir}" + echo "${rocksdb_static} ${lib_search} -static -lbz2 -lz -pthread -lstdc++ -ldl" > "$output_file" + else + # Fallback: use library paths with static linking flags + local lib_search="-L${rocksdb_lib_dir}" + [ -d "$lib_dir" ] && [ "$lib_dir" != "/usr/lib" ] && lib_search="${lib_search} -L${lib_dir}" + echo "${lib_search} -Wl,-Bstatic -lrocksdb -lbz2 -lz -Wl,-Bdynamic -static -pthread -lstdc++ -ldl" > "$output_file" + fi + else + echo_error "Unsupported operating system: $os" + exit 1 + fi +} + +# Main function +main() { + local os=$(detect_os) + + case "$1" in + install) + echo_info "Starting RocksDB dependencies installation..." + if [ "$os" = "darwin" ]; then + install_deps_darwin + elif [ "$os" = "linux" ]; then + install_deps_linux + else + echo_error "Unsupported operating system: $os" + exit 1 + fi + ;; + check) + echo_info "Checking dependencies status..." + if [ "$os" = "darwin" ]; then + install_deps_darwin + elif [ "$os" = "linux" ]; then + install_deps_linux + else + echo_error "Unsupported operating system: $os" + exit 1 + fi + ;; + ldflags) + if [ -z "$2" ] || [ -z "$3" ]; then + echo_error "Usage: $0 ldflags " + exit 1 + fi + generate_cgo_ldflags "$2" "$3" + ;; + *) + echo "Usage: $0 {install|check|ldflags} [args...]" + echo "" + echo "Commands:" + echo " install - Install missing dependencies" + echo " check - Check and install missing dependencies" + echo " ldflags - Generate CGO_LDFLAGS and save to file" + exit 1 + ;; + esac +} + +main "$@" + diff --git a/server/server.go b/server/server.go index c316931a..42f2633f 100644 --- a/server/server.go +++ b/server/server.go @@ -222,6 +222,8 @@ func DefaultStateStoreProvider(c *StateStoreConfig) (api.StateStoreFactory, erro switch strings.ToLower(*c.Type) { case common.StateStorePebble: return statestore.NewPebbleStateStoreFactory(c.Config) + case common.StateStoreRocksDB: + return newRocksDBStateStoreFactory(c.Config) } return statestore.NewDefaultPebbleStateStoreFactory() } diff --git a/server/server_lite.go b/server/server_lite.go new file mode 100644 index 00000000..2d2f6e0e --- /dev/null +++ b/server/server_lite.go @@ -0,0 +1,30 @@ +//go:build !rocksdb + +/* + * Copyright 2024 Function Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package server + +import ( + "fmt" + + "github.com/functionstream/function-stream/common/config" + "github.com/functionstream/function-stream/fs/api" +) + +func newRocksDBStateStoreFactory(cfg config.ConfigMap) (api.StateStoreFactory, error) { + return nil, fmt.Errorf("RocksDB support is not compiled in this build. Please rebuild with -tags rocksdb or use Pebble state store") +} diff --git a/server/server_rocksdb.go b/server/server_rocksdb.go new file mode 100644 index 00000000..733822d7 --- /dev/null +++ b/server/server_rocksdb.go @@ -0,0 +1,29 @@ +//go:build rocksdb + +/* + * Copyright 2024 Function Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package server + +import ( + "github.com/functionstream/function-stream/common/config" + "github.com/functionstream/function-stream/fs/api" + "github.com/functionstream/function-stream/fs/statestore" +) + +func newRocksDBStateStoreFactory(cfg config.ConfigMap) (api.StateStoreFactory, error) { + return statestore.NewRocksDBStateStoreFactory(cfg) +}