fix pool usage (#170)

Reviewed-on: https://git.ptzo.gdn/feditools/relay/pulls/170
Co-authored-by: Tyr Mactire <tyr@pettingzoo.co>
Co-committed-by: Tyr Mactire <tyr@pettingzoo.co>
This commit is contained in:
Tyr Mactire 2022-11-28 04:38:43 +00:00 committed by PettingZoo Gitea
parent e806385400
commit f3d26c147f
No known key found for this signature in database
GPG Key ID: 39788A4390A1372F
6 changed files with 27 additions and 45 deletions

View File

@ -14,11 +14,9 @@ func (r *Runner) EnqueueUpdateAccountInfo(_ context.Context, accountID int64) er
job := faktory.NewJob(JobUpdateAccountInfo, strconv.FormatInt(accountID, 10))
job.Retry = &retry
client, err := r.manager.Pool.Get()
if err != nil {
return err
}
return client.Push(job)
return r.manager.Pool.With(func(conn *faktory.Client) error {
return conn.Push(job)
})
}
func (r *Runner) updateAccountInfo(ctx context.Context, args ...interface{}) error {

View File

@ -15,11 +15,9 @@ func (r *Runner) EnqueueDeliverActivity(_ context.Context, instanceID int64, act
job := faktory.NewJob(JobDeliverActivity, strconv.FormatInt(instanceID, 10), activity)
job.Queue = QueueDelivery
client, err := r.manager.Pool.Get()
if err != nil {
return err
}
return client.Push(job)
return r.manager.Pool.With(func(conn *faktory.Client) error {
return conn.Push(job)
})
}
func (r *Runner) deliverActivity(ctx context.Context, args ...interface{}) error {
@ -62,11 +60,9 @@ func (r *Runner) EnqueueInboxActivity(_ context.Context, instanceID int64, actor
job := faktory.NewJob(JobInboxActivity, strconv.FormatInt(instanceID, 10), actorIRI, activity)
job.Queue = QueueDefault
client, err := r.manager.Pool.Get()
if err != nil {
return err
}
return client.Push(job)
return r.manager.Pool.With(func(conn *faktory.Client) error {
return conn.Push(job)
})
}
func (r *Runner) inboxActivity(ctx context.Context, args ...interface{}) error {

View File

@ -11,31 +11,25 @@ import (
func (r *Runner) EnqueueProcessBlockAdd(_ context.Context, blockID int64) error {
job := faktory.NewJob(JobProcessBlockAdd, strconv.FormatInt(blockID, 10))
client, err := r.manager.Pool.Get()
if err != nil {
return err
}
return client.Push(job)
return r.manager.Pool.With(func(conn *faktory.Client) error {
return conn.Push(job)
})
}
func (r *Runner) EnqueueProcessBlockDelete(_ context.Context, blockID int64) error {
job := faktory.NewJob(JobProcessBlockDelete, strconv.FormatInt(blockID, 10))
client, err := r.manager.Pool.Get()
if err != nil {
return err
}
return client.Push(job)
return r.manager.Pool.With(func(conn *faktory.Client) error {
return conn.Push(job)
})
}
func (r *Runner) EnqueueProcessBlockUpdate(_ context.Context, blockID int64) error {
job := faktory.NewJob(JobProcessBlockUpdate, strconv.FormatInt(blockID, 10))
client, err := r.manager.Pool.Get()
if err != nil {
return err
}
return client.Push(job)
return r.manager.Pool.With(func(conn *faktory.Client) error {
return conn.Push(job)
})
}
func (r *Runner) processBlockAdd(ctx context.Context, args ...interface{}) error {

View File

@ -14,11 +14,9 @@ func (r *Runner) EnqueueUpdateInstanceInfo(_ context.Context, instanceID int64)
job := faktory.NewJob(JobUpdateInstanceInfo, strconv.FormatInt(instanceID, 10))
job.Retry = &retry
client, err := r.manager.Pool.Get()
if err != nil {
return err
}
return client.Push(job)
return r.manager.Pool.With(func(conn *faktory.Client) error {
return conn.Push(job)
})
}
func (r *Runner) updateInstanceInfo(ctx context.Context, args ...interface{}) error {

View File

@ -11,11 +11,9 @@ func (r *Runner) EnqueueMaintDeliveryErrorTimeout(_ context.Context) error {
job := faktory.NewJob(JobMaintDeliveryErrorTimeout)
job.Args = []interface{}{}
client, err := r.manager.Pool.Get()
if err != nil {
return err
}
return client.Push(job)
return r.manager.Pool.With(func(conn *faktory.Client) error {
return conn.Push(job)
})
}
func (r *Runner) maintDeliveryErrorTimeout(ctx context.Context, args ...interface{}) error {

View File

@ -13,11 +13,9 @@ func (r *Runner) EnqueueSendNotification(_ context.Context, event models.EventTy
job := faktory.NewJob(JobSendNotification, event, metadata)
job.Queue = QueuePriority
client, err := r.manager.Pool.Get()
if err != nil {
return err
}
return client.Push(job)
return r.manager.Pool.With(func(conn *faktory.Client) error {
return conn.Push(job)
})
}
func (r *Runner) sendNotification(ctx context.Context, args ...interface{}) error {