don't deliver to unfollowed instances (#174)

Reviewed-on: https://git.ptzo.gdn/feditools/relay/pulls/174
Co-authored-by: Tyr Mactire <tyr@pettingzoo.co>
Co-committed-by: Tyr Mactire <tyr@pettingzoo.co>
This commit is contained in:
Tyr Mactire 2022-12-28 23:10:35 +00:00 committed by PettingZoo Gitea
parent c6240950fe
commit ec647bbdce
No known key found for this signature in database
GPG Key ID: 39788A4390A1372F
3 changed files with 65 additions and 21 deletions

View File

@ -15,7 +15,7 @@ type Logic interface {
Metrics
Scheduler
DeliverActivity(ctx context.Context, jid string, instanceID int64, activity fedihelper.Activity) error
DeliverActivity(ctx context.Context, jid string, instanceID int64, activity fedihelper.Activity, force bool) error
Domain() string
GetAccountConfigMap(ctx context.Context, keys ...models.ConfigKey) (*models.AccountConfigMap, error)
GetConfigMap(ctx context.Context, keys ...models.ConfigKey) (*models.ConfigMap, error)

View File

@ -11,11 +11,18 @@ import (
"git.ptzo.gdn/feditools/relay/internal/path"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"net/url"
"strconv"
)
func (l *Logic) DeliverActivity(ctx context.Context, jid string, instanceID int64, activity fedihelper.Activity) error {
func (l *Logic) DeliverActivity(ctx context.Context, jid string, instanceID int64, activity fedihelper.Activity, force bool) error {
ctx, tracer := l.tracer.Start(ctx, "DeliverActivity", trace.WithSpanKind(trace.SpanKindInternal), trace.WithAttributes(
attribute.Bool("force", force),
))
defer tracer.End()
log := logger.WithFields(logrus.Fields{
"func": "DeliverActivity",
"jid": jid,
@ -29,11 +36,22 @@ func (l *Logic) DeliverActivity(ctx context.Context, jid string, instanceID int6
return fmt.Errorf("db read: %s", err.Error())
}
// drop if paused
if instance.IsPaused {
log.Debugf("instance %s is paused, dropping delivery", instance.Domain)
if !force {
// drop if not following
if !instance.IsFollowing {
log.Debugf("instance %s is not following, dropping delivery", instance.Domain)
tracer.SetAttributes(attribute.Bool("skipped", true))
return nil
return nil
}
// drop if paused
if instance.IsPaused {
log.Debugf("instance %s is paused, dropping delivery", instance.Domain)
tracer.SetAttributes(attribute.Bool("skipped", true))
return nil
}
}
// send activity
@ -166,6 +184,9 @@ func (l *Logic) ProcessActivity(ctx context.Context, jid string, instanceID int6
}
func (l *Logic) doFollow(ctx context.Context, jid string, instance *models.Instance, activity fedihelper.Activity) error {
ctx, tracer := l.tracer.Start(ctx, "doFollow", trace.WithSpanKind(trace.SpanKindInternal))
defer tracer.End()
log := logger.WithFields(logrus.Fields{
"func": "doFollow",
"jid": jid,
@ -186,6 +207,20 @@ func (l *Logic) doFollow(ctx context.Context, jid string, instance *models.Insta
return errors.New("activity id is not string")
}
// skip already following
if instance.IsFollowing == true {
tracer.SetAttributes(attribute.Bool("skipped", true))
return nil
}
// send accept
outgoingActivity := genActivityAccept(l.domain, instance.ActorIRI, id)
err := l.DeliverActivity(ctx, jid, instance.ID, outgoingActivity, true)
if err != nil {
log.Errorf("enqueueing delivery: %s", err.Error())
}
// set followed
instance.IsFollowing = true
@ -237,13 +272,6 @@ func (l *Logic) doFollow(ctx context.Context, jid string, instance *models.Insta
return err
}
// send accept
outgoingActivity := genActivityAccept(l.domain, instance.ActorIRI, id)
err = l.runner.EnqueueDeliverActivity(ctx, instance.ID, outgoingActivity)
if err != nil {
log.Errorf("enqueueing delivery: %s", err.Error())
}
go l.runner.EnqueueSendNotification(
ctx,
models.EventInstanceFollow,
@ -256,6 +284,9 @@ func (l *Logic) doFollow(ctx context.Context, jid string, instance *models.Insta
}
func (l *Logic) doForward(ctx context.Context, jid string, instance *models.Instance, activity fedihelper.Activity) error {
ctx, tracer := l.tracer.Start(ctx, "doForward", trace.WithSpanKind(trace.SpanKindInternal))
defer tracer.End()
log := logger.WithFields(logrus.Fields{
"func": "doForward",
"jid": jid,
@ -303,6 +334,9 @@ func (l *Logic) doForward(ctx context.Context, jid string, instance *models.Inst
}
func (l *Logic) doRelay(ctx context.Context, jid string, instance *models.Instance, activity fedihelper.Activity) error {
ctx, tracer := l.tracer.Start(ctx, "doRelay", trace.WithSpanKind(trace.SpanKindInternal))
defer tracer.End()
log := logger.WithFields(logrus.Fields{
"func": "doRelay",
"jid": jid,
@ -358,6 +392,9 @@ func (l *Logic) doRelay(ctx context.Context, jid string, instance *models.Instan
}
func (l *Logic) doUndo(ctx context.Context, jid string, instance *models.Instance, activity fedihelper.Activity) error {
ctx, tracer := l.tracer.Start(ctx, "doUndo", trace.WithSpanKind(trace.SpanKindInternal))
defer tracer.End()
log := logger.WithFields(logrus.Fields{
"func": "doUndo",
"jid": jid,
@ -373,6 +410,20 @@ func (l *Logic) doUndo(ctx context.Context, jid string, instance *models.Instanc
case fedihelper.TypeAnnounce:
return l.doForward(ctx, jid, instance, activity)
case fedihelper.TypeFollow:
// skip already following
if instance.IsFollowing == false {
tracer.SetAttributes(attribute.Bool("skipped", true))
return nil
}
// send undo
outgoingActivity := genActivityUndo(l.domain, instance.ActorIRI)
err = l.DeliverActivity(ctx, jid, instance.ID, outgoingActivity, true)
if err != nil {
log.Errorf("enqueueing delivery: %s", err.Error())
}
instance.IsFollowing = false
// create log entries
@ -423,13 +474,6 @@ func (l *Logic) doUndo(ctx context.Context, jid string, instance *models.Instanc
return err
}
// send undo
outgoingActivity := genActivityUndo(l.domain, instance.ActorIRI)
err = l.runner.EnqueueDeliverActivity(ctx, instance.ID, outgoingActivity)
if err != nil {
log.Errorf("enqueueing delivery: %s", err.Error())
}
go l.runner.EnqueueSendNotification(
ctx,
models.EventInstanceUnfollow,

View File

@ -56,7 +56,7 @@ func (r *Runner) deliverActivity(ctx context.Context, args ...interface{}) error
return fmt.Errorf("argument 1 is not an activity")
}
return r.logic.DeliverActivity(ctx, help.Jid(), instanceID, activity)
return r.logic.DeliverActivity(ctx, help.Jid(), instanceID, activity, false)
}
func (r *Runner) EnqueueInboxActivity(_ context.Context, instanceID int64, actorIRI string, activity fedihelper.Activity) error {