diff --git a/config/config.go b/config/config.go index 5bd417c14..3a73addfb 100644 --- a/config/config.go +++ b/config/config.go @@ -824,7 +824,7 @@ func InitServer(ctx context.Context, currentServers ServerType) error { viper.SetDefault("Cache.RunLocation", filepath.Join("/run", "pelican", "xrootd", "cache")) } viper.SetDefault("Cache.DataLocation", "/run/pelican/xcache") - viper.SetDefault("FileCache.RunLocation", filepath.Join("/run", "pelican", "filecache")) + viper.SetDefault("LocalCache.RunLocation", filepath.Join("/run", "pelican", "localcache")) viper.SetDefault("Origin.Multiuser", true) viper.SetDefault("Director.GeoIPLocation", "/var/cache/pelican/maxmind/GeoLite2-City.mmdb") @@ -864,12 +864,12 @@ func InitServer(ctx context.Context, currentServers ServerType) error { cleanupDirOnShutdown(ctx, runtimeDir) } viper.SetDefault("Cache.DataLocation", filepath.Join(runtimeDir, "xcache")) - viper.SetDefault("FileCache.RunLocation", filepath.Join(runtimeDir, "cache")) + viper.SetDefault("LocalCache.RunLocation", filepath.Join(runtimeDir, "cache")) viper.SetDefault("Origin.Multiuser", false) } - fcRunLocation := viper.GetString("FileCache.RunLocation") - viper.SetDefault("FileCache.Socket", filepath.Join(fcRunLocation, "cache.sock")) - viper.SetDefault("FileCache.DataLocation", filepath.Join(fcRunLocation, "cache")) + fcRunLocation := viper.GetString("LocalCache.RunLocation") + viper.SetDefault("LocalCache.Socket", filepath.Join(fcRunLocation, "cache.sock")) + viper.SetDefault("LocalCache.DataLocation", filepath.Join(fcRunLocation, "cache")) // Any platform-specific paths should go here err := InitServerOSDefaults() diff --git a/docs/parameters.yaml b/docs/parameters.yaml index c9d08175b..32e5b26f0 100644 --- a/docs/parameters.yaml +++ b/docs/parameters.yaml @@ -618,49 +618,55 @@ default: path components: ["origin"] --- ############################ -# File-cache configs # +# Local cache configs # ############################ -name: FileCache.RunLocation +name: LocalCache.RunLocation description: >- - The directory for the runtime files of the file cache -type: string -root_default: /run/pelican/filecache -default: $XDG_RUNTIME_DIR/pelican/filecache + The directory for the runtime files of the local cache +type: filename +root_default: /run/pelican/localcache +default: $XDG_RUNTIME_DIR/pelican/localcache +components: ["localcache"] --- -name: FileCache.DataLocation +name: LocalCache.DataLocation description: >- The directory for the location of the cache data files - this is where the actual data in the cache is stored - for the file cache. -type: string -default: $PELICAN_FILECACHE_RUNLOCATION/cache + for the local cache. +type: filename +default: $PELICAN_LOCALCACHE_RUNLOCATION/cache +components: ["localcache"] --- -name: FileCache.Socket +name: LocalCache.Socket description: >- - The location of the socket used for client communication for the file cache -type: string -default: $PELICAN_FILECACHE_RUNLOCATION/cache.sock + The location of the socket used for client communication for the local cache +type: filename +default: $PELICAN_LOCALCACHE_RUNLOCATION/cache.sock +components: ["localcache"] --- -name: FileCache.Size +name: LocalCache.Size description: >- - The maximum size of the file cache. If not set, it is assumed the entire device can be used. + The maximum size of the local cache. If not set, it is assumed the entire device can be used. type: string default: 0 +components: ["localcache"] --- -name: FileCache.HighWaterMarkPercentage +name: LocalCache.HighWaterMarkPercentage description: >- A percentage value where the cache cleanup routines will triggered. Once the cache usage of completed files hits the high water mark, files will be deleted until the usage hits the low water mark. type: int default: 95 +components: ["localcache"] --- -name: FileCache.LowWaterMarkPercentage +name: LocalCache.LowWaterMarkPercentage description: >- A percentage value where the cache cleanup routines will complete. Once the cache usage of completed files hits the high water mark, files will be deleted until the usage hits the low water mark. type: int default: 85 +components: ["localcache"] --- ############################ # Cache-level configs # diff --git a/launchers/launcher.go b/launchers/launcher.go index 6e5f37b05..2b76455c7 100644 --- a/launchers/launcher.go +++ b/launchers/launcher.go @@ -34,7 +34,7 @@ import ( "github.com/pelicanplatform/pelican/broker" "github.com/pelicanplatform/pelican/config" - simple_cache "github.com/pelicanplatform/pelican/file_cache" + "github.com/pelicanplatform/pelican/local_cache" "github.com/pelicanplatform/pelican/origin_ui" "github.com/pelicanplatform/pelican/param" "github.com/pelicanplatform/pelican/server_ui" @@ -251,7 +251,7 @@ func LaunchModules(ctx context.Context, modules config.ServerType) (context.Canc if modules.IsEnabled(config.LocalCacheType) { log.Debugln("Starting local cache listener") - if err := simple_cache.LaunchListener(ctx, egrp); err != nil { + if err := local_cache.LaunchListener(ctx, egrp); err != nil { log.Errorln("Failure when starting the local cache listener:", err) return shutdownCancel, err } diff --git a/file_cache/cache_api.go b/local_cache/cache_api.go similarity index 96% rename from file_cache/cache_api.go rename to local_cache/cache_api.go index b95c12ebe..3d861353e 100644 --- a/file_cache/cache_api.go +++ b/local_cache/cache_api.go @@ -16,7 +16,7 @@ * ***************************************************************/ -package simple_cache +package local_cache import ( "context" @@ -39,7 +39,7 @@ import ( // Launch the unix socket listener as a separate goroutine func LaunchListener(ctx context.Context, egrp *errgroup.Group) error { - socketName := param.FileCache_Socket.GetString() + socketName := param.LocalCache_Socket.GetString() if err := os.MkdirAll(filepath.Dir(socketName), fs.FileMode(0755)); err != nil { return errors.Wrap(err, "failed to create socket directory") } @@ -48,7 +48,7 @@ func LaunchListener(ctx context.Context, egrp *errgroup.Group) error { if err != nil { return err } - sc, err := NewSimpleCache(ctx, egrp) + sc, err := NewLocalCache(ctx, egrp) if err != nil { return err } diff --git a/file_cache/cache_authz.go b/local_cache/cache_authz.go similarity index 99% rename from file_cache/cache_authz.go rename to local_cache/cache_authz.go index cbdaa932a..19804b9ac 100644 --- a/file_cache/cache_authz.go +++ b/local_cache/cache_authz.go @@ -16,7 +16,7 @@ * ***************************************************************/ -package simple_cache +package local_cache import ( "context" diff --git a/file_cache/cache_test.go b/local_cache/cache_test.go similarity index 94% rename from file_cache/cache_test.go rename to local_cache/cache_test.go index 55a518465..261c6ac91 100644 --- a/file_cache/cache_test.go +++ b/local_cache/cache_test.go @@ -16,7 +16,7 @@ * ***************************************************************/ -package simple_cache_test +package local_cache_test import ( "context" @@ -33,8 +33,8 @@ import ( "github.com/pelicanplatform/pelican/client" "github.com/pelicanplatform/pelican/config" - simple_cache "github.com/pelicanplatform/pelican/file_cache" "github.com/pelicanplatform/pelican/launchers" + local_cache "github.com/pelicanplatform/pelican/local_cache" "github.com/pelicanplatform/pelican/param" "github.com/pelicanplatform/pelican/test_utils" "github.com/pelicanplatform/pelican/token_scopes" @@ -154,10 +154,10 @@ func TestFedPublicGet(t *testing.T) { ft := fedTest{} ft.spinup(t, ctx, egrp) - sc, err := simple_cache.NewSimpleCache(ctx, egrp) + lc, err := local_cache.NewLocalCache(ctx, egrp) require.NoError(t, err) - reader, err := sc.Get("/test/hello_world.txt", "") + reader, err := lc.Get("/test/hello_world.txt", "") require.NoError(t, err) byteBuff, err := io.ReadAll(reader) @@ -165,7 +165,7 @@ func TestFedPublicGet(t *testing.T) { assert.Equal(t, "Hello, World!", string(byteBuff)) // Query again -- cache hit case - reader, err = sc.Get("/test/hello_world.txt", "") + reader, err = lc.Get("/test/hello_world.txt", "") require.NoError(t, err) assert.Equal(t, "*os.File", fmt.Sprintf("%T", reader)) @@ -183,7 +183,7 @@ func TestFedAuthGet(t *testing.T) { ft := fedTest{} ft.spinup(t, ctx, egrp) - lc, err := simple_cache.NewSimpleCache(ctx, egrp) + lc, err := local_cache.NewLocalCache(ctx, egrp) require.NoError(t, err) reader, err := lc.Get("/test/hello_world.txt", ft.token) @@ -223,7 +223,7 @@ func TestHttpReq(t *testing.T) { transport := config.GetTransport().Clone() transport.DialContext = func(_ context.Context, _, _ string) (net.Conn, error) { - return net.Dial("unix", param.FileCache_Socket.GetString()) + return net.Dial("unix", param.LocalCache_Socket.GetString()) } client := &http.Client{Transport: transport} @@ -251,7 +251,7 @@ func TestClient(t *testing.T) { cacheUrl := &url.URL{ Scheme: "unix", - Path: param.FileCache_Socket.GetString(), + Path: param.LocalCache_Socket.GetString(), } discoveryHost := param.Federation_DiscoveryUrl.GetString() @@ -278,7 +278,7 @@ func TestStat(t *testing.T) { ft := fedTest{} ft.spinup(t, ctx, egrp) - lc, err := simple_cache.NewSimpleCache(ctx, egrp) + lc, err := local_cache.NewLocalCache(ctx, egrp) require.NoError(t, err) size, err := lc.Stat("/test/hello_world.txt", "") @@ -309,7 +309,7 @@ func TestLargeFile(t *testing.T) { cacheUrl := &url.URL{ Scheme: "unix", - Path: param.FileCache_Socket.GetString(), + Path: param.LocalCache_Socket.GetString(), } fp, err := os.OpenFile(filepath.Join(ft.originDir, "hello_world.txt"), os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600) diff --git a/file_cache/simple_cache.go b/local_cache/local_cache.go similarity index 94% rename from file_cache/simple_cache.go rename to local_cache/local_cache.go index b3b45bfc2..2557e81e5 100644 --- a/file_cache/simple_cache.go +++ b/local_cache/local_cache.go @@ -16,7 +16,7 @@ * ***************************************************************/ -package simple_cache +package local_cache import ( "container/heap" @@ -48,7 +48,7 @@ import ( ) type ( - SimpleCache struct { + LocalCache struct { ctx context.Context egrp *errgroup.Group te *client.TransferEngine @@ -104,7 +104,7 @@ type ( } cacheReader struct { - sc *SimpleCache + sc *LocalCache offset int64 path string token string @@ -209,15 +209,15 @@ func (ds *downloadStatus) String() string { } } -// Create a simple cache object +// Create a local cache object // // Launches background goroutines associated with the cache -func NewSimpleCache(ctx context.Context, egrp *errgroup.Group) (sc *SimpleCache, err error) { +func NewLocalCache(ctx context.Context, egrp *errgroup.Group) (sc *LocalCache, err error) { // Setup cache on disk - cacheDir := param.FileCache_DataLocation.GetString() + cacheDir := param.LocalCache_DataLocation.GetString() if cacheDir == "" { - err = errors.New("FileCache.DataLocation is not set; cannot determine where to place file cache's data") + err = errors.New("LocalCache.DataLocation is not set; cannot determine where to place file cache's data") return } if err = os.RemoveAll(cacheDir); err != nil { @@ -227,7 +227,7 @@ func NewSimpleCache(ctx context.Context, egrp *errgroup.Group) (sc *SimpleCache, return } - sizeStr := param.FileCache_Size.GetString() + sizeStr := param.LocalCache_Size.GetString() var cacheSize uint64 if sizeStr == "" || sizeStr == "0" { var stat syscall.Statfs_t @@ -238,21 +238,21 @@ func NewSimpleCache(ctx context.Context, egrp *errgroup.Group) (sc *SimpleCache, cacheSize = stat.Bavail * uint64(stat.Bsize) } else { var signedCacheSize int64 - signedCacheSize, err = units.ParseStrictBytes(param.FileCache_Size.GetString()) + signedCacheSize, err = units.ParseStrictBytes(param.LocalCache_Size.GetString()) if err != nil { return } cacheSize = uint64(signedCacheSize) } - highWaterPercentage := param.FileCache_HighWaterMarkPercentage.GetInt() - lowWaterPercentage := param.FileCache_LowWaterMarkPercentage.GetInt() + highWaterPercentage := param.LocalCache_HighWaterMarkPercentage.GetInt() + lowWaterPercentage := param.LocalCache_LowWaterMarkPercentage.GetInt() directorUrl, err := url.Parse(param.Federation_DirectorUrl.GetString()) if err != nil { return } - sc = &SimpleCache{ + sc = &LocalCache{ ctx: ctx, egrp: egrp, te: client.NewTransferEngine(ctx), @@ -291,7 +291,7 @@ func NewSimpleCache(ctx context.Context, egrp *errgroup.Group) (sc *SimpleCache, // // The TransferClient will invoke the callback as it progresses; // the callback info will be used to help the waiters progress. -func (sc *SimpleCache) callback(path string, downloaded int64, size int64, completed bool) { +func (sc *LocalCache) callback(path string, downloaded int64, size int64, completed bool) { ds := func() (ds *downloadStatus) { sc.mutex.RLock() defer sc.mutex.RUnlock() @@ -309,7 +309,7 @@ func (sc *SimpleCache) callback(path string, downloaded int64, size int64, compl } // The main goroutine for managing the cache and its requests -func (sc *SimpleCache) runMux() error { +func (sc *LocalCache) runMux() error { results := sc.tc.Results() type result struct { @@ -563,7 +563,7 @@ func (sc *SimpleCache) runMux() error { } } -func (sc *SimpleCache) purge() { +func (sc *LocalCache) purge() { heap.Init(&sc.lru) start := time.Now() for sc.cacheSize > sc.lowWater { @@ -587,7 +587,7 @@ func (sc *SimpleCache) purge() { // Given a URL, return a reader from the disk cache // // If there is no sentinal $NAME.DONE file, then returns nil -func (sc *SimpleCache) getFromDisk(localPath string) *os.File { +func (sc *LocalCache) getFromDisk(localPath string) *os.File { localPath = filepath.Join(sc.basePath, path.Clean(localPath)) fp, err := os.Open(localPath + ".DONE") if err != nil { @@ -600,7 +600,7 @@ func (sc *SimpleCache) getFromDisk(localPath string) *os.File { return nil } -func (sc *SimpleCache) newCacheReader(path, token string) (reader *cacheReader, err error) { +func (sc *LocalCache) newCacheReader(path, token string) (reader *cacheReader, err error) { reader = &cacheReader{ path: path, token: token, @@ -612,7 +612,7 @@ func (sc *SimpleCache) newCacheReader(path, token string) (reader *cacheReader, } // Get path from the cache -func (sc *SimpleCache) Get(path, token string) (io.ReadCloser, error) { +func (sc *LocalCache) Get(path, token string) (io.ReadCloser, error) { if !sc.ac.authorize(token_scopes.Storage_Read, path, token) { return nil, authorizationDenied } @@ -625,7 +625,7 @@ func (sc *SimpleCache) Get(path, token string) (io.ReadCloser, error) { } -func (lc *SimpleCache) Stat(path, token string) (uint64, error) { +func (lc *LocalCache) Stat(path, token string) (uint64, error) { if !lc.ac.authorize(token_scopes.Storage_Read, path, token) { return 0, authorizationDenied } @@ -645,7 +645,7 @@ func (lc *SimpleCache) Stat(path, token string) (uint64, error) { return client.DoStat(context.Background(), dUrl.String(), client.WithToken(token)) } -func (sc *SimpleCache) updateConfig() error { +func (sc *LocalCache) updateConfig() error { // Get the endpoint of the director var respNS []common.NamespaceAdV2 @@ -679,7 +679,7 @@ func (sc *SimpleCache) updateConfig() error { } // Periodically update the cache configuration from the registry -func (sc *SimpleCache) periodicUpdateConfig() error { +func (sc *LocalCache) periodicUpdateConfig() error { ticker := time.NewTicker(time.Minute) for { select { diff --git a/param/parameters.go b/param/parameters.go index 5b18ec302..e48427fab 100644 --- a/param/parameters.go +++ b/param/parameters.go @@ -90,10 +90,6 @@ var ( Federation_RegistryUrl = StringParam{"Federation.RegistryUrl"} Federation_TopologyNamespaceUrl = StringParam{"Federation.TopologyNamespaceUrl"} Federation_TopologyUrl = StringParam{"Federation.TopologyUrl"} - FileCache_DataLocation = StringParam{"FileCache.DataLocation"} - FileCache_RunLocation = StringParam{"FileCache.RunLocation"} - FileCache_Size = StringParam{"FileCache.Size"} - FileCache_Socket = StringParam{"FileCache.Socket"} IssuerKey = StringParam{"IssuerKey"} Issuer_AuthenticationSource = StringParam{"Issuer.AuthenticationSource"} Issuer_GroupFile = StringParam{"Issuer.GroupFile"} @@ -102,6 +98,10 @@ var ( Issuer_QDLLocation = StringParam{"Issuer.QDLLocation"} Issuer_ScitokensServerLocation = StringParam{"Issuer.ScitokensServerLocation"} Issuer_TomcatLocation = StringParam{"Issuer.TomcatLocation"} + LocalCache_DataLocation = StringParam{"LocalCache.DataLocation"} + LocalCache_RunLocation = StringParam{"LocalCache.RunLocation"} + LocalCache_Size = StringParam{"LocalCache.Size"} + LocalCache_Socket = StringParam{"LocalCache.Socket"} Logging_Cache_Ofs = StringParam{"Logging.Cache.Ofs"} Logging_Cache_Pss = StringParam{"Logging.Cache.Pss"} Logging_Cache_Scitokens = StringParam{"Logging.Cache.Scitokens"} @@ -206,8 +206,8 @@ var ( Director_MaxStatResponse = IntParam{"Director.MaxStatResponse"} Director_MinStatResponse = IntParam{"Director.MinStatResponse"} Director_StatConcurrencyLimit = IntParam{"Director.StatConcurrencyLimit"} - FileCache_HighWaterMarkPercentage = IntParam{"FileCache.HighWaterMarkPercentage"} - FileCache_LowWaterMarkPercentage = IntParam{"FileCache.LowWaterMarkPercentage"} + LocalCache_HighWaterMarkPercentage = IntParam{"LocalCache.HighWaterMarkPercentage"} + LocalCache_LowWaterMarkPercentage = IntParam{"LocalCache.LowWaterMarkPercentage"} MinimumDownloadSpeed = IntParam{"MinimumDownloadSpeed"} Monitoring_PortHigher = IntParam{"Monitoring.PortHigher"} Monitoring_PortLower = IntParam{"Monitoring.PortLower"} diff --git a/param/parameters_struct.go b/param/parameters_struct.go index a5d48cdb8..6531d90bc 100644 --- a/param/parameters_struct.go +++ b/param/parameters_struct.go @@ -73,14 +73,6 @@ type Config struct { TopologyReloadInterval time.Duration TopologyUrl string } - FileCache struct { - DataLocation string - HighWaterMarkPercentage int - LowWaterMarkPercentage int - RunLocation string - Size string - Socket string - } GeoIPOverrides interface{} Issuer struct { AuthenticationSource string @@ -95,6 +87,14 @@ type Config struct { TomcatLocation string } IssuerKey string + LocalCache struct { + DataLocation string + HighWaterMarkPercentage int + LowWaterMarkPercentage int + RunLocation string + Size string + Socket string + } Logging struct { Cache struct { Ofs string @@ -306,14 +306,6 @@ type configWithType struct { TopologyReloadInterval struct { Type string; Value time.Duration } TopologyUrl struct { Type string; Value string } } - FileCache struct { - DataLocation struct { Type string; Value string } - HighWaterMarkPercentage struct { Type string; Value int } - LowWaterMarkPercentage struct { Type string; Value int } - RunLocation struct { Type string; Value string } - Size struct { Type string; Value string } - Socket struct { Type string; Value string } - } GeoIPOverrides struct { Type string; Value interface{} } Issuer struct { AuthenticationSource struct { Type string; Value string } @@ -328,6 +320,14 @@ type configWithType struct { TomcatLocation struct { Type string; Value string } } IssuerKey struct { Type string; Value string } + LocalCache struct { + DataLocation struct { Type string; Value string } + HighWaterMarkPercentage struct { Type string; Value int } + LowWaterMarkPercentage struct { Type string; Value int } + RunLocation struct { Type string; Value string } + Size struct { Type string; Value string } + Socket struct { Type string; Value string } + } Logging struct { Cache struct { Ofs struct { Type string; Value string }