package weed_server

import (
	"context"
	"fmt"
	"io"

	"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
	"github.com/seaweedfs/seaweedfs/weed/storage/backend"
	"github.com/seaweedfs/seaweedfs/weed/storage/needle"
)

func (vs *VolumeServer) VolumeIncrementalCopy(req *volume_server_pb.VolumeIncrementalCopyRequest, stream volume_server_pb.VolumeServer_VolumeIncrementalCopyServer) error {

	v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
	if v == nil {
		return fmt.Errorf("not found volume id %d", req.VolumeId)
	}

	stopOffset, _, _ := v.FileStat()
	foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(req.SinceNs)
	if err != nil {
		return fmt.Errorf("fail to locate by appendAtNs %d: %s", req.SinceNs, err)
	}

	if isLastOne {
		return nil
	}

	startOffset := foundOffset.ToActualOffset()

	buf := make([]byte, 1024*1024*2)
	return sendFileContent(v.DataBackend, buf, startOffset, int64(stopOffset), stream)

}

func (vs *VolumeServer) VolumeSyncStatus(ctx context.Context, req *volume_server_pb.VolumeSyncStatusRequest) (*volume_server_pb.VolumeSyncStatusResponse, error) {

	v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
	if v == nil {
		return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
	}

	resp := v.GetVolumeSyncStatus()

	return resp, nil

}

func sendFileContent(datBackend backend.BackendStorageFile, buf []byte, startOffset, stopOffset int64, stream volume_server_pb.VolumeServer_VolumeIncrementalCopyServer) error {
	var blockSizeLimit = int64(len(buf))
	for i := int64(0); i < stopOffset-startOffset; i += blockSizeLimit {
		n, readErr := datBackend.ReadAt(buf, startOffset+i)
		if readErr == nil || readErr == io.EOF {
			resp := &volume_server_pb.VolumeIncrementalCopyResponse{}
			resp.FileContent = buf[:int64(n)]
			sendErr := stream.Send(resp)
			if sendErr != nil {
				return sendErr
			}
		} else {
			return readErr
		}
	}
	return nil
}

Related articles

Seaweed File System postgres2_store

package postgres2 import ( "context" "database/sql" "fmt" "strconv" "time" _ "github.com/lib/pq" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer/abstract_sql" "github.com/seaweedfs/seaweedfs/weed/filer/po

Seaweed File System needle_id_type

package types import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/util" "strconv" ) type NeedleId uint64 const ( NeedleIdSize = 8 NeedleIdEmpty = 0 ) func NeedleIdToBytes(bytes []byte, needleId NeedleId) { util.Uint64toBytes(bytes, uint64(need

Seaweed File System Serializer

/* * Copyright 2009 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://ww

Seaweed File System track_sync_offset

package remote_storage import ( "context" "errors" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/grpc" ) const ( SyncKeyPrefix = "remote.s

Seaweed File System webdav

package command import ( "context" "fmt" "net/http" "os" "os/user" "strconv" "time" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/s

Seaweed File System KryoBackedEncoder

/* * Copyright 2013 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://ww

Seaweed File System rocksdb_store_kv

//go:build rocksdb // +build rocksdb package rocksdb import ( "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/filer" ) func (store *RocksDBStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { err = store.db.Put(store.wo

Seaweed File System UserController

package org.apache.hadoop.seaweed.ftp.controller; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.apache.ftpserver.ftplet.User; import org.apache.ftpserver.usermanager.Md5PasswordEncryptor; import org.apache.ftp

Seaweed File System ec_shard

package erasure_coding import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/storage/types" "os" "path" "strconv" "strings" "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/storage/needle" ) type ShardId uint8 ty

Seaweed File System mongodb_store

package mongodb import ( "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" "go.mongodb.org/mongo-dri