local cache (#180)

Reviewed-on: https://git.ptzo.gdn/feditools/relay/pulls/180
Co-authored-by: Tyr Mactire <tyr@pettingzoo.co>
Co-committed-by: Tyr Mactire <tyr@pettingzoo.co>
This commit is contained in:
Tyr Mactire 2022-12-31 05:00:22 +00:00 committed by PettingZoo Gitea
parent 9e3542beea
commit c6f54dd1b2
No known key found for this signature in database
GPG Key ID: 39788A4390A1372F
14 changed files with 1252 additions and 2 deletions

1
go.mod
View File

@ -20,6 +20,7 @@ require (
github.com/hashicorp/golang-lru v0.5.4
github.com/jackc/pgconn v1.13.0
github.com/jackc/pgx/v4 v4.17.2
github.com/jellydator/ttlcache/v3 v3.0.0
github.com/nickname76/telegrambot v1.2.2
github.com/nicksnyder/go-i18n/v2 v2.2.1
github.com/prometheus/client_golang v1.14.0

2
go.sum
View File

@ -306,6 +306,8 @@ github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0f
github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v1.3.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jellydator/ttlcache/v3 v3.0.0 h1:zmFhqrB/4sKiEiJHhtseJsNRE32IMVmJSs4++4gaQO4=
github.com/jellydator/ttlcache/v3 v3.0.0/go.mod h1:WwTaEmcXQ3MTjOm4bsZoDFiCu/hMvNWLO1w67RXz6h4=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=

View File

@ -40,7 +40,7 @@ func New(d db.DB, t *fedihelper.Transport, k kv.KV, tok *token.Tokenizer) (*Modu
tokz: tok,
}
fedi, err := fedihelper.New(k, t, appName, fediHelpers, viper.GetInt(config.Keys.HTTPClientTimeout))
fedi, err := fedihelper.New(NewMemory(k), t, appName, fediHelpers, viper.GetInt(config.Keys.HTTPClientTimeout))
if err != nil {
return nil, err
}

123
internal/fedi/memory.go Normal file
View File

@ -0,0 +1,123 @@
package fedi
import (
"context"
"fmt"
"git.ptzo.gdn/feditools/go-lib/fedihelper"
"git.ptzo.gdn/feditools/relay/internal/kv"
"github.com/jellydator/ttlcache/v3"
"time"
)
var (
ErrNil = fmt.Errorf("nil")
)
// NewMemory creates a new memory cache.
func NewMemory(k kv.KV) *Cache {
actor := ttlcache.New[string, []byte](
ttlcache.WithDisableTouchOnHit[string, []byte](),
)
go actor.Start()
hostMeta := ttlcache.New[string, []byte](
ttlcache.WithDisableTouchOnHit[string, []byte](),
)
go hostMeta.Start()
nodeInfo := ttlcache.New[string, []byte](
ttlcache.WithDisableTouchOnHit[string, []byte](),
)
go nodeInfo.Start()
return &Cache{
kv: k,
actor: actor,
hostMeta: hostMeta,
nodeInfo: nodeInfo,
}
}
type Cache struct {
kv kv.KV
actor *ttlcache.Cache[string, []byte]
hostMeta *ttlcache.Cache[string, []byte]
nodeInfo *ttlcache.Cache[string, []byte]
}
var _ fedihelper.KV = (*Cache)(nil)
func (c *Cache) DeleteAccessToken(ctx context.Context, accountID int64) error {
return c.kv.DeleteAccessToken(ctx, accountID)
}
func (c *Cache) GetAccessToken(ctx context.Context, accountID int64) (string, error) {
return c.kv.GetAccessToken(ctx, accountID)
}
func (c *Cache) SetAccessToken(ctx context.Context, accountID int64, accessToken string) error {
return c.kv.SetAccessToken(ctx, accountID, accessToken)
}
func (c *Cache) DeleteActor(_ context.Context, actorURI string) error {
c.actor.Delete(actorURI)
return nil
}
func (c *Cache) GetActor(_ context.Context, actorURI string) ([]byte, error) {
cached := c.actor.Get(actorURI)
if cached == nil {
return nil, ErrNil
}
return cached.Value(), nil
}
func (c *Cache) SetActor(_ context.Context, actorURI string, actor []byte, expire time.Duration) error {
c.actor.Set(actorURI, actor, expire)
return nil
}
func (c *Cache) DeleteHostMeta(_ context.Context, domain string) error {
c.hostMeta.Delete(domain)
return nil
}
func (c *Cache) GetHostMeta(_ context.Context, domain string) ([]byte, error) {
cached := c.hostMeta.Get(domain)
if cached == nil {
return nil, ErrNil
}
return cached.Value(), nil
}
func (c *Cache) SetHostMeta(_ context.Context, domain string, hostmeta []byte, expire time.Duration) error {
c.hostMeta.Set(domain, hostmeta, expire)
return nil
}
func (c *Cache) DeleteFediNodeInfo(_ context.Context, domain string) error {
c.nodeInfo.Delete(domain)
return nil
}
func (c *Cache) GetFediNodeInfo(_ context.Context, domain string) ([]byte, error) {
cached := c.nodeInfo.Get(domain)
if cached == nil {
return nil, ErrNil
}
return cached.Value(), nil
}
func (c *Cache) SetFediNodeInfo(_ context.Context, domain string, nodeinfo []byte, expire time.Duration) error {
c.nodeInfo.Set(domain, nodeinfo, expire)
return nil
}

View File

@ -86,7 +86,7 @@ func (m *Module) inboxPostHandler(w nethttp.ResponseWriter, r *nethttp.Request)
return
}
default:
l.Errorf("unknown actor type: %s", actor.Type)
l.Errorf("unknown actor type: %s (%s)", actor.Type, actor.ID)
nethttp.Error(w, nethttp.StatusText(nethttp.StatusBadRequest), nethttp.StatusBadRequest)
return

94
vendor/github.com/jellydator/ttlcache/v3/CHANGELOG.md generated vendored Normal file
View File

@ -0,0 +1,94 @@
# 2.11.0 (December 2021)
#64: @DoubeDi added a method `GetItems` to retrieve all items in the cache. This method also triggers all callbacks associated with a normal `Get`
## API changes:
// GetItems returns a copy of all items in the cache. Returns nil when the cache has been closed.
func (cache *Cache) GetItems() map[string]interface{} {
# 2.10.0 (December 2021)
#62 : @nikhilk1701 found a memory leak where removed items are not directly eligible for garbage collection. There are no API changes.
# 2.9.0 (October 2021)
#55,#56,#57 : @chenyahui was on fire and greatly improved the peformance of the library. He also got rid of the blocking call to expirationNotification, making the code run twice as fast in the benchmarks!
# 2.8.1 (September 2021)
#53 : Avoids recalculation of TTL value returned in API when TTL is extended. by @iczc
# 2.8.0 (August 2021)
#51 : The call GetWithTTL(key string) (interface{}, time.Duration, error) is added so that you can retrieve an item, and also know the remaining TTL. Thanks to @asgarciap for contributing.
# 2.7.0 (June 2021)
#46 : got panic
A panic occured in a line that checks the maximum amount of items in the cache. While not definite root cause has been found, there is indeed the possibility of crashing an empty cache if the cache limit is set to 'zero' which codes for infinite. This would lead to removal of the first item in the cache which would panic on an empty cache.
Fixed this by applying the global cache lock to all configuration options as well.
# 2.6.0 (May 2021)
#44 : There are no API changes, but a contribution was made to use https://pkg.go.dev/golang.org/x/sync/singleflight as a way to provide everybody waiting for a key with that key when it's fetched.
This removes some complexity from the code and will make sure that all callers will get a return value even if there's high concurrency and low TTL (as proven by the test that was added).
# 2.5.0 (May 2021)
## API changes:
* #39 : Allow custom loader function for each key via `GetByLoader`
Introduce the `SimpleCache` interface for quick-start and basic usage.
# 2.4.0 (April 2021)
## API changes:
* #42 : Add option to get list of keys
* #40: Allow 'Touch' on items without other operation
// Touch resets the TTL of the key when it exists, returns ErrNotFound if the key is not present.
func (cache *Cache) Touch(key string) error
// GetKeys returns all keys of items in the cache. Returns nil when the cache has been closed.
func (cache *Cache) GetKeys() []string
# 2.3.0 (February 2021)
## API changes:
* #38: Added func (cache *Cache) SetExpirationReasonCallback(callback ExpireReasonCallback) This wil function will replace SetExpirationCallback(..) in the next major version.
# 2.2.0 (January 2021)
## API changes:
* #37 : a GetMetrics call is now available for some information on hits/misses etc.
* #34 : Errors are now const
# 2.1.0 (October 2020)
## API changes
* `SetCacheSizeLimit(limit int)` a call was contributed to set a cache limit. #35
# 2.0.0 (July 2020)
## Fixes #29, #30, #31
## Behavioural changes
* `Remove(key)` now also calls the expiration callback when it's set
* `Count()` returns zero when the cache is closed
## API changes
* `SetLoaderFunction` allows you to provide a function to retrieve data on missing cache keys.
* Operations that affect item behaviour such as `Close`, `Set`, `SetWithTTL`, `Get`, `Remove`, `Purge` now return an error with standard errors `ErrClosed` an `ErrNotFound` instead of a bool or nothing
* `SkipTTLExtensionOnHit` replaces `SkipTtlExtensionOnHit` to satisfy golint
* The callback types are now exported

21
vendor/github.com/jellydator/ttlcache/v3/LICENSE generated vendored Normal file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2022 Jellydator
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

129
vendor/github.com/jellydator/ttlcache/v3/README.md generated vendored Normal file
View File

@ -0,0 +1,129 @@
# TTLCache - an in-memory cache with item expiration
## Features
- Simple API.
- Type parameters.
- Item expiration and automatic deletion.
- Automatic expiration time extension on each `Get` call.
- `Loader` interface that is used to load/lazily initialize missing cache
items.
- Subscription to cache events (insertion and eviction).
- Metrics.
- Configurability.
## Installation
```
go get github.com/jellydator/ttlcache/v3
```
## Usage
The main type of `ttlcache` is `Cache`. It represents a single
in-memory data store.
To create a new instance of `ttlcache.Cache`, the `ttlcache.New()` function
should be called:
```go
func main() {
cache := ttlcache.New[string, string]()
}
```
Note that by default, a new cache instance does not let any of its
items to expire or be automatically deleted. However, this feature
can be activated by passing a few additional options into the
`ttlcache.New()` function and calling the `cache.Start()` method:
```go
func main() {
cache := ttlcache.New[string, string](
ttlcache.WithTTL[string, string](30 * time.Minute),
)
go cache.Start() // starts automatic expired item deletion
}
```
Even though the `cache.Start()` method handles expired item deletion well,
there may be times when the system that uses `ttlcache` needs to determine
when to delete the expired items itself. For example, it may need to
delete them only when the resource load is at its lowest (e.g., after
midnight, when the number of users/HTTP requests drops). So, in situations
like these, instead of calling `cache.Start()`, the system could
periodically call `cache.DeleteExpired()`:
```go
func main() {
cache := ttlcache.New[string, string](
ttlcache.WithTTL[string, string](30 * time.Minute),
)
for {
time.Sleep(4 * time.Hour)
cache.DeleteExpired()
}
}
```
The data stored in `ttlcache.Cache` can be retrieved and updated with
`Set`, `Get`, `Delete`, etc. methods:
```go
func main() {
cache := ttlcache.New[string, string](
ttlcache.WithTTL[string, string](30 * time.Minute),
)
// insert data
cache.Set("first", "value1", ttlcache.DefaultTTL)
cache.Set("second", "value2", ttlcache.NoTTL)
cache.Set("third", "value3", ttlcache.DefaultTTL)
// retrieve data
item := cache.Get("first")
fmt.Println(item.Value(), item.ExpiresAt())
// delete data
cache.Delete("second")
cache.DeleteExpired()
cache.DeleteAll()
}
```
To subscribe to insertion and eviction events, `cache.OnInsertion()` and
`cache.OnEviction()` methods should be used:
```go
func main() {
cache := ttlcache.New[string, string](
ttlcache.WithTTL[string, string](30 * time.Minute),
ttlcache.WithCapacity[string, string](300),
)
cache.OnInsertion(func(item *ttlcache.Item[string, string]) {
fmt.Println(item.Value(), item.ExpiresAt())
})
cache.OnEviction(func(reason ttlcache.EvictionReason, item *ttlcache.Item[string, string]) {
if reason == ttlcache.EvictionReasonCapacityReached {
fmt.Println(item.Key(), item.Value())
}
})
cache.Set("first", "value1", ttlcache.DefaultTTL)
cache.DeleteAll()
}
```
To load data when the cache does not have it, a custom or
existing implementation of `ttlcache.Loader` can be used:
```go
func main() {
loader := ttlcache.LoaderFunc[string, string](
func(c *ttlcache.Cache[string, string], key string) *ttlcache.Item[string, string] {
// load from file/make an HTTP request
item := c.Set("key from file", "value from file")
return item
},
)
cache := ttlcache.New[string, string](
ttlcache.WithLoader[string, string](loader),
)
item := cache.Get("key from file")
}
```

574
vendor/github.com/jellydator/ttlcache/v3/cache.go generated vendored Normal file
View File

@ -0,0 +1,574 @@
package ttlcache
import (
"container/list"
"context"
"fmt"
"sync"
"time"
"golang.org/x/sync/singleflight"
)
// Available eviction reasons.
const (
EvictionReasonDeleted EvictionReason = iota + 1
EvictionReasonCapacityReached
EvictionReasonExpired
)
// EvictionReason is used to specify why a certain item was
// evicted/deleted.
type EvictionReason int
// Cache is a synchronised map of items that are automatically removed
// when they expire or the capacity is reached.
type Cache[K comparable, V any] struct {
items struct {
mu sync.RWMutex
values map[K]*list.Element
// a generic doubly linked list would be more convenient
// (and more performant?). It's possible that this
// will be introduced with/in go1.19+
lru *list.List
expQueue expirationQueue[K, V]
timerCh chan time.Duration
}
metricsMu sync.RWMutex
metrics Metrics
events struct {
insertion struct {
mu sync.RWMutex
nextID uint64
fns map[uint64]func(*Item[K, V])
}
eviction struct {
mu sync.RWMutex
nextID uint64
fns map[uint64]func(EvictionReason, *Item[K, V])
}
}
stopCh chan struct{}
options options[K, V]
}
// New creates a new instance of cache.
func New[K comparable, V any](opts ...Option[K, V]) *Cache[K, V] {
c := &Cache[K, V]{
stopCh: make(chan struct{}),
}
c.items.values = make(map[K]*list.Element)
c.items.lru = list.New()
c.items.expQueue = newExpirationQueue[K, V]()
c.items.timerCh = make(chan time.Duration, 1) // buffer is important
c.events.insertion.fns = make(map[uint64]func(*Item[K, V]))
c.events.eviction.fns = make(map[uint64]func(EvictionReason, *Item[K, V]))
applyOptions(&c.options, opts...)
return c
}
// updateExpirations updates the expiration queue and notifies
// the cache auto cleaner if needed.
// Not concurrently safe.
func (c *Cache[K, V]) updateExpirations(fresh bool, elem *list.Element) {
var oldExpiresAt time.Time
if !c.items.expQueue.isEmpty() {
oldExpiresAt = c.items.expQueue[0].Value.(*Item[K, V]).expiresAt
}
if fresh {
c.items.expQueue.push(elem)
} else {
c.items.expQueue.update(elem)
}
newExpiresAt := c.items.expQueue[0].Value.(*Item[K, V]).expiresAt
// check if the closest/soonest expiration timestamp changed
if newExpiresAt.IsZero() || (!oldExpiresAt.IsZero() && !newExpiresAt.Before(oldExpiresAt)) {
return
}
d := time.Until(newExpiresAt)
// It's possible that the auto cleaner isn't active or
// is busy, so we need to drain the channel before
// sending a new value.
// Also, since this method is called after locking the items' mutex,
// we can be sure that there is no other concurrent call of this
// method
if len(c.items.timerCh) > 0 {
// we need to drain this channel in a select with a default
// case because it's possible that the auto cleaner
// read this channel just after we entered this if
select {
case d1 := <-c.items.timerCh:
if d1 < d {
d = d1
}
default:
}
}
// since the channel has a size 1 buffer, we can be sure
// that the line below won't block (we can't overfill the buffer
// because we just drained it)
c.items.timerCh <- d
}
// set creates a new item, adds it to the cache and then returns it.
// Not concurrently safe.
func (c *Cache[K, V]) set(key K, value V, ttl time.Duration) *Item[K, V] {
if ttl == DefaultTTL {
ttl = c.options.ttl
}
elem := c.get(key, false)
if elem != nil {
// update/overwrite an existing item
item := elem.Value.(*Item[K, V])
item.update(value, ttl)
c.updateExpirations(false, elem)
return item
}
if c.options.capacity != 0 && uint64(len(c.items.values)) >= c.options.capacity {
// delete the oldest item
c.evict(EvictionReasonCapacityReached, c.items.lru.Back())
}
// create a new item
item := newItem(key, value, ttl)
elem = c.items.lru.PushFront(item)
c.items.values[key] = elem
c.updateExpirations(true, elem)
c.metricsMu.Lock()
c.metrics.Insertions++
c.metricsMu.Unlock()
c.events.insertion.mu.RLock()
for _, fn := range c.events.insertion.fns {
fn(item)
}
c.events.insertion.mu.RUnlock()
return item
}
// get retrieves an item from the cache and extends its expiration
// time if 'touch' is set to true.
// It returns nil if the item is not found or is expired.
// Not concurrently safe.
func (c *Cache[K, V]) get(key K, touch bool) *list.Element {
elem := c.items.values[key]
if elem == nil {
return nil
}
item := elem.Value.(*Item[K, V])
if item.isExpiredUnsafe() {
return nil
}
c.items.lru.MoveToFront(elem)
if touch && item.ttl > 0 {
item.touch()
c.updateExpirations(false, elem)
}
return elem
}
// evict deletes items from the cache.
// If no items are provided, all currently present cache items
// are evicted.
// Not concurrently safe.
func (c *Cache[K, V]) evict(reason EvictionReason, elems ...*list.Element) {
if len(elems) > 0 {
c.metricsMu.Lock()
c.metrics.Evictions += uint64(len(elems))
c.metricsMu.Unlock()
c.events.eviction.mu.RLock()
for i := range elems {
item := elems[i].Value.(*Item[K, V])
delete(c.items.values, item.key)
c.items.lru.Remove(elems[i])
c.items.expQueue.remove(elems[i])
for _, fn := range c.events.eviction.fns {
fn(reason, item)
}
}
c.events.eviction.mu.RUnlock()
return
}
c.metricsMu.Lock()
c.metrics.Evictions += uint64(len(c.items.values))
c.metricsMu.Unlock()
c.events.eviction.mu.RLock()
for _, elem := range c.items.values {
item := elem.Value.(*Item[K, V])
for _, fn := range c.events.eviction.fns {
fn(reason, item)
}
}
c.events.eviction.mu.RUnlock()
c.items.values = make(map[K]*list.Element)
c.items.lru.Init()
c.items.expQueue = newExpirationQueue[K, V]()
}
// Set creates a new item from the provided key and value, adds
// it to the cache and then returns it. If an item associated with the
// provided key already exists, the new item overwrites the existing one.
func (c *Cache[K, V]) Set(key K, value V, ttl time.Duration) *Item[K, V] {
c.items.mu.Lock()
defer c.items.mu.Unlock()
return c.set(key, value, ttl)
}
// Get retrieves an item from the cache by the provided key.
// Unless this is disabled, it also extends/touches an item's
// expiration timestamp on successful retrieval.
// If the item is not found, a nil value is returned.
func (c *Cache[K, V]) Get(key K, opts ...Option[K, V]) *Item[K, V] {
getOpts := options[K, V]{
loader: c.options.loader,
disableTouchOnHit: c.options.disableTouchOnHit,
}
applyOptions(&getOpts, opts...)
c.items.mu.Lock()
elem := c.get(key, !getOpts.disableTouchOnHit)
c.items.mu.Unlock()
if elem == nil {
c.metricsMu.Lock()
c.metrics.Misses++
c.metricsMu.Unlock()
if getOpts.loader != nil {
return getOpts.loader.Load(c, key)
}
return nil
}
c.metricsMu.Lock()
c.metrics.Hits++
c.metricsMu.Unlock()
return elem.Value.(*Item[K, V])
}
// Delete deletes an item from the cache. If the item associated with
// the key is not found, the method is no-op.
func (c *Cache[K, V]) Delete(key K) {
c.items.mu.Lock()
defer c.items.mu.Unlock()
elem := c.items.values[key]
if elem == nil {
return
}
c.evict(EvictionReasonDeleted, elem)
}
// DeleteAll deletes all items from the cache.
func (c *Cache[K, V]) DeleteAll() {
c.items.mu.Lock()
c.evict(EvictionReasonDeleted)
c.items.mu.Unlock()
}
// DeleteExpired deletes all expired items from the cache.
func (c *Cache[K, V]) DeleteExpired() {
c.items.mu.Lock()
defer c.items.mu.Unlock()
if c.items.expQueue.isEmpty() {
return
}
e := c.items.expQueue[0]
for e.Value.(*Item[K, V]).isExpiredUnsafe() {
c.evict(EvictionReasonExpired, e)
if c.items.expQueue.isEmpty() {
break
}
// expiration queue has a new root
e = c.items.expQueue[0]
}
}
// Touch simulates an item's retrieval without actually returning it.
// Its main purpose is to extend an item's expiration timestamp.
// If the item is not found, the method is no-op.
func (c *Cache[K, V]) Touch(key K) {
c.items.mu.Lock()
c.get(key, true)
c.items.mu.Unlock()
}
// Len returns the number of items in the cache.
func (c *Cache[K, V]) Len() int {
c.items.mu.RLock()
defer c.items.mu.RUnlock()
return len(c.items.values)
}
// Keys returns all keys currently present in the cache.
func (c *Cache[K, V]) Keys() []K {
c.items.mu.RLock()
defer c.items.mu.RUnlock()
res := make([]K, 0, len(c.items.values))
for k := range c.items.values {
res = append(res, k)
}
return res
}
// Items returns a copy of all items in the cache.
// It does not update any expiration timestamps.
func (c *Cache[K, V]) Items() map[K]*Item[K, V] {
c.items.mu.RLock()
defer c.items.mu.RUnlock()
items := make(map[K]*Item[K, V], len(c.items.values))
for k := range c.items.values {
item := c.get(k, false)
if item != nil {
items[k] = item.Value.(*Item[K, V])
}
}
return items
}
// Metrics returns the metrics of the cache.
func (c *Cache[K, V]) Metrics() Metrics {
c.metricsMu.RLock()
defer c.metricsMu.RUnlock()
return c.metrics
}
// Start starts an automatic cleanup process that
// periodically deletes expired items.
// It blocks until Stop is called.
func (c *Cache[K, V]) Start() {
waitDur := func() time.Duration {
c.items.mu.RLock()
defer c.items.mu.RUnlock()
if !c.items.expQueue.isEmpty() &&
!c.items.expQueue[0].Value.(*Item[K, V]).expiresAt.IsZero() {
d := time.Until(c.items.expQueue[0].Value.(*Item[K, V]).expiresAt)
if d <= 0 {
// execute immediately
return time.Microsecond
}
return d
}
if c.options.ttl > 0 {
return c.options.ttl
}
return time.Hour
}
timer := time.NewTimer(waitDur())
stop := func() {
if !timer.Stop() {
// drain the timer chan
select {
case <-timer.C:
default:
}
}
}
defer stop()
for {
select {
case <-c.stopCh:
return
case d := <-c.items.timerCh:
stop()
timer.Reset(d)
case <-timer.C:
c.DeleteExpired()
stop()
timer.Reset(waitDur())
}
}
}
// Stop stops the automatic cleanup process.
// It blocks until the cleanup process exits.
func (c *Cache[K, V]) Stop() {
c.stopCh <- struct{}{}
}
// OnInsertion adds the provided function to be executed when
// a new item is inserted into the cache. The function is executed
// on a separate goroutine and does not block the flow of the cache
// manager.
// The returned function may be called to delete the subscription function
// from the list of insertion subscribers.
// When the returned function is called, it blocks until all instances of
// the same subscription function return. A context is used to notify the
// subscription function when the returned/deletion function is called.
func (c *Cache[K, V]) OnInsertion(fn func(context.Context, *Item[K, V])) func() {
var (
wg sync.WaitGroup
ctx, cancel = context.WithCancel(context.Background())
)
c.events.insertion.mu.Lock()
id := c.events.insertion.nextID
c.events.insertion.fns[id] = func(item *Item[K, V]) {
wg.Add(1)
go func() {
fn(ctx, item)
wg.Done()
}()
}
c.events.insertion.nextID++
c.events.insertion.mu.Unlock()
return func() {
cancel()
c.events.insertion.mu.Lock()
delete(c.events.insertion.fns, id)
c.events.insertion.mu.Unlock()
wg.Wait()
}
}
// OnEviction adds the provided function to be executed when
// an item is evicted/deleted from the cache. The function is executed
// on a separate goroutine and does not block the flow of the cache
// manager.
// The returned function may be called to delete the subscription function
// from the list of eviction subscribers.
// When the returned function is called, it blocks until all instances of
// the same subscription function return. A context is used to notify the
// subscription function when the returned/deletion function is called.
func (c *Cache[K, V]) OnEviction(fn func(context.Context, EvictionReason, *Item[K, V])) func() {
var (
wg sync.WaitGroup
ctx, cancel = context.WithCancel(context.Background())
)
c.events.eviction.mu.Lock()
id := c.events.eviction.nextID
c.events.eviction.fns[id] = func(r EvictionReason, item *Item[K, V]) {
wg.Add(1)
go func() {
fn(ctx, r, item)
wg.Done()
}()
}
c.events.eviction.nextID++
c.events.eviction.mu.Unlock()
return func() {
cancel()
c.events.eviction.mu.Lock()
delete(c.events.eviction.fns, id)
c.events.eviction.mu.Unlock()
wg.Wait()
}
}
// Loader is an interface that handles missing data loading.
type Loader[K comparable, V any] interface {
// Load should execute a custom item retrieval logic and
// return the item that is associated with the key.
// It should return nil if the item is not found/valid.
// The method is allowed to fetch data from the cache instance
// or update it for future use.
Load(c *Cache[K, V], key K) *Item[K, V]
}
// LoaderFunc type is an adapter that allows the use of ordinary
// functions as data loaders.
type LoaderFunc[K comparable, V any] func(*Cache[K, V], K) *Item[K, V]
// Load executes a custom item retrieval logic and returns the item that
// is associated with the key.
// It returns nil if the item is not found/valid.
func (l LoaderFunc[K, V]) Load(c *Cache[K, V], key K) *Item[K, V] {
return l(c, key)
}
// SuppressedLoader wraps another Loader and suppresses duplicate
// calls to its Load method.
type SuppressedLoader[K comparable, V any] struct {
Loader[K, V]
group *singleflight.Group
}
// Load executes a custom item retrieval logic and returns the item that
// is associated with the key.
// It returns nil if the item is not found/valid.
// It also ensures that only one execution of the wrapped Loader's Load
// method is in-flight for a given key at a time.
func (l *SuppressedLoader[K, V]) Load(c *Cache[K, V], key K) *Item[K, V] {
// there should be a better/generic way to create a
// singleflight Group's key. It's possible that a generic
// singleflight.Group will be introduced with/in go1.19+
strKey := fmt.Sprint(key)
// the error can be discarded since the singleflight.Group
// itself does not return any of its errors, it returns
// the error that we return ourselves in the func below, which
// is also nil
res, _, _ := l.group.Do(strKey, func() (interface{}, error) {
item := l.Loader.Load(c, key)
if item == nil {
return nil, nil
}
return item, nil
})
if res == nil {
return nil
}
return res.(*Item[K, V])
}

View File

@ -0,0 +1,85 @@
package ttlcache
import (
"container/heap"
"container/list"
)
// expirationQueue stores items that are ordered by their expiration
// timestamps. The 0th item is closest to its expiration.
type expirationQueue[K comparable, V any] []*list.Element
// newExpirationQueue creates and initializes a new expiration queue.
func newExpirationQueue[K comparable, V any]() expirationQueue[K, V] {
q := make(expirationQueue[K, V], 0)
heap.Init(&q)
return q
}
// isEmpty checks if the queue is empty.
func (q expirationQueue[K, V]) isEmpty() bool {
return q.Len() == 0
}
// update updates an existing item's value and position in the queue.
func (q *expirationQueue[K, V]) update(elem *list.Element) {
heap.Fix(q, elem.Value.(*Item[K, V]).queueIndex)
}
// push pushes a new item into the queue and updates the order of its
// elements.
func (q *expirationQueue[K, V]) push(elem *list.Element) {
heap.Push(q, elem)
}
// remove removes an item from the queue and updates the order of its
// elements.
func (q *expirationQueue[K, V]) remove(elem *list.Element) {
heap.Remove(q, elem.Value.(*Item[K, V]).queueIndex)
}
// Len returns the total number of items in the queue.
func (q expirationQueue[K, V]) Len() int {
return len(q)
}
// Less checks if the item at the i position expires sooner than
// the one at the j position.
func (q expirationQueue[K, V]) Less(i, j int) bool {
item1, item2 := q[i].Value.(*Item[K, V]), q[j].Value.(*Item[K, V])
if item1.expiresAt.IsZero() {
return false
}
if item2.expiresAt.IsZero() {
return true
}
return item1.expiresAt.Before(item2.expiresAt)
}
// Swap switches the places of two queue items.
func (q expirationQueue[K, V]) Swap(i, j int) {
q[i], q[j] = q[j], q[i]
q[i].Value.(*Item[K, V]).queueIndex = i
q[j].Value.(*Item[K, V]).queueIndex = j
}
// Push appends a new item to the item slice.
func (q *expirationQueue[K, V]) Push(x interface{}) {
elem := x.(*list.Element)
elem.Value.(*Item[K, V]).queueIndex = len(*q)
*q = append(*q, elem)
}
// Pop removes and returns the last item.
func (q *expirationQueue[K, V]) Pop() interface{} {
old := *q
i := len(old) - 1
elem := old[i]
elem.Value.(*Item[K, V]).queueIndex = -1
old[i] = nil // avoid memory leak
*q = old[:i]
return elem
}

130
vendor/github.com/jellydator/ttlcache/v3/item.go generated vendored Normal file
View File

@ -0,0 +1,130 @@
package ttlcache
import (
"sync"
"time"
)
const (
// NoTTL indicates that an item should never expire.
NoTTL time.Duration = -1
// DefaultTTL indicates that the default TTL
// value should be used.
DefaultTTL time.Duration = 0
)
// Item holds all the information that is associated with a single
// cache value.
type Item[K comparable, V any] struct {
// the mutex needs to be locked only when:
// - data fields are being read inside accessor methods
// - data fields are being updated
// when data fields are being read in one of the cache's
// methods, we can be sure that these fields are not modified in
// parallel since the item list is locked by its own mutex as
// well, so locking this mutex would be redundant.
// In other words, this mutex is only useful when these fields
// are being read from the outside (e.g. in event functions).
mu sync.RWMutex
key K
value V
ttl time.Duration
expiresAt time.Time
queueIndex int
}
// newItem creates a new cache item.
func newItem[K comparable, V any](key K, value V, ttl time.Duration) *Item[K, V] {
item := &Item[K, V]{
key: key,
value: value,
ttl: ttl,
}
item.touch()
return item
}
// update modifies the item's value and TTL.
func (item *Item[K, V]) update(value V, ttl time.Duration) {
item.mu.Lock()
defer item.mu.Unlock()
item.value = value
item.ttl = ttl
// reset expiration timestamp because the new TTL may be
// 0 or below
item.expiresAt = time.Time{}
item.touchUnsafe()
}
// touch updates the item's expiration timestamp.
func (item *Item[K, V]) touch() {
item.mu.Lock()
defer item.mu.Unlock()
item.touchUnsafe()
}
// touchUnsafe updates the item's expiration timestamp without
// locking the mutex.
func (item *Item[K, V]) touchUnsafe() {
if item.ttl <= 0 {
return
}
item.expiresAt = time.Now().Add(item.ttl)
}
// IsExpired returns a bool value that indicates whether the item
// is expired.
func (item *Item[K, V]) IsExpired() bool {
item.mu.RLock()
defer item.mu.RUnlock()
return item.isExpiredUnsafe()
}
// isExpiredUnsafe returns a bool value that indicates whether the
// the item is expired without locking the mutex
func (item *Item[K, V]) isExpiredUnsafe() bool {
if item.ttl <= 0 {
return false
}
return item.expiresAt.Before(time.Now())
}
// Key returns the key of the item.
func (item *Item[K, V]) Key() K {
item.mu.RLock()
defer item.mu.RUnlock()
return item.key
}
// Value returns the value of the item.
func (item *Item[K, V]) Value() V {
item.mu.RLock()
defer item.mu.RUnlock()
return item.value
}
// TTL returns the TTL value of the item.
func (item *Item[K, V]) TTL() time.Duration {
item.mu.RLock()
defer item.mu.RUnlock()
return item.ttl
}
// ExpiresAt returns the expiration timestamp of the item.
func (item *Item[K, V]) ExpiresAt() time.Time {
item.mu.RLock()
defer item.mu.RUnlock()
return item.expiresAt
}

21
vendor/github.com/jellydator/ttlcache/v3/metrics.go generated vendored Normal file
View File

@ -0,0 +1,21 @@
package ttlcache
// Metrics contains common cache metrics calculated over the course
// of the cache's lifetime.
type Metrics struct {
// Insertions specifies how many items were inserted.
Insertions uint64
// Hits specifies how many items were successfully retrieved
// from the cache.
// Retrievals made with a loader function are not tracked.
Hits uint64
// Misses specifies how many items were not found in the cache.
// Retrievals made with a loader function are tracked as well.
Misses uint64
// Evictions specifies how many items were removed from the
// cache.
Evictions uint64
}

67
vendor/github.com/jellydator/ttlcache/v3/options.go generated vendored Normal file
View File

@ -0,0 +1,67 @@
package ttlcache
import "time"
// Option sets a specific cache option.
type Option[K comparable, V any] interface {
apply(opts *options[K, V])
}
// optionFunc wraps a function and implements the Option interface.
type optionFunc[K comparable, V any] func(*options[K, V])
// apply calls the wrapped function.
func (fn optionFunc[K, V]) apply(opts *options[K, V]) {
fn(opts)
}
// options holds all available cache configuration options.
type options[K comparable, V any] struct {
capacity uint64
ttl time.Duration
loader Loader[K, V]
disableTouchOnHit bool
}
// applyOptions applies the provided option values to the option struct.
func applyOptions[K comparable, V any](v *options[K, V], opts ...Option[K, V]) {
for i := range opts {
opts[i].apply(v)
}
}
// WithCapacity sets the maximum capacity of the cache.
// It has no effect when passing into Get().
func WithCapacity[K comparable, V any](c uint64) Option[K, V] {
return optionFunc[K, V](func(opts *options[K, V]) {
opts.capacity = c
})
}
// WithTTL sets the TTL of the cache.
// It has no effect when passing into Get().
func WithTTL[K comparable, V any](ttl time.Duration) Option[K, V] {
return optionFunc[K, V](func(opts *options[K, V]) {
opts.ttl = ttl
})
}
// WithLoader sets the loader of the cache.
// When passing into Get(), it sets an epheral loader that
// is used instead of the cache's default one.
func WithLoader[K comparable, V any](l Loader[K, V]) Option[K, V] {
return optionFunc[K, V](func(opts *options[K, V]) {
opts.loader = l
})
}
// WithDisableTouchOnHit prevents the cache instance from
// extending/touching an item's expiration timestamp when it is being
// retrieved.
// When passing into Get(), it overrides the default value of the
// cache.
func WithDisableTouchOnHit[K comparable, V any]() Option[K, V] {
return optionFunc[K, V](func(opts *options[K, V]) {
opts.disableTouchOnHit = true
})
}

3
vendor/modules.txt vendored
View File

@ -341,6 +341,9 @@ github.com/jackc/pgtype
github.com/jackc/pgx/v4
github.com/jackc/pgx/v4/internal/sanitize
github.com/jackc/pgx/v4/stdlib
# github.com/jellydator/ttlcache/v3 v3.0.0
## explicit; go 1.18
github.com/jellydator/ttlcache/v3
# github.com/jinzhu/inflection v1.0.0
## explicit
github.com/jinzhu/inflection