diff --git a/internal/logic/logic.go b/internal/logic/logic.go index 2818371..95785e7 100644 --- a/internal/logic/logic.go +++ b/internal/logic/logic.go @@ -15,7 +15,7 @@ type Logic interface { Metrics Scheduler - DeliverActivity(ctx context.Context, jid string, instanceID int64, activity fedihelper.Activity) error + DeliverActivity(ctx context.Context, jid string, instanceID int64, activity fedihelper.Activity, force bool) error Domain() string GetAccountConfigMap(ctx context.Context, keys ...models.ConfigKey) (*models.AccountConfigMap, error) GetConfigMap(ctx context.Context, keys ...models.ConfigKey) (*models.ConfigMap, error) diff --git a/internal/logic/logic1/activity.go b/internal/logic/logic1/activity.go index 8ff1593..154ddad 100644 --- a/internal/logic/logic1/activity.go +++ b/internal/logic/logic1/activity.go @@ -11,11 +11,18 @@ import ( "git.ptzo.gdn/feditools/relay/internal/path" "github.com/google/uuid" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "net/url" "strconv" ) -func (l *Logic) DeliverActivity(ctx context.Context, jid string, instanceID int64, activity fedihelper.Activity) error { +func (l *Logic) DeliverActivity(ctx context.Context, jid string, instanceID int64, activity fedihelper.Activity, force bool) error { + ctx, tracer := l.tracer.Start(ctx, "DeliverActivity", trace.WithSpanKind(trace.SpanKindInternal), trace.WithAttributes( + attribute.Bool("force", force), + )) + defer tracer.End() + log := logger.WithFields(logrus.Fields{ "func": "DeliverActivity", "jid": jid, @@ -29,11 +36,22 @@ func (l *Logic) DeliverActivity(ctx context.Context, jid string, instanceID int6 return fmt.Errorf("db read: %s", err.Error()) } - // drop if paused - if instance.IsPaused { - log.Debugf("instance %s is paused, dropping delivery", instance.Domain) + if !force { + // drop if not following + if !instance.IsFollowing { + log.Debugf("instance %s is not following, dropping delivery", instance.Domain) + tracer.SetAttributes(attribute.Bool("skipped", true)) - return nil + return nil + } + + // drop if paused + if instance.IsPaused { + log.Debugf("instance %s is paused, dropping delivery", instance.Domain) + tracer.SetAttributes(attribute.Bool("skipped", true)) + + return nil + } } // send activity @@ -166,6 +184,9 @@ func (l *Logic) ProcessActivity(ctx context.Context, jid string, instanceID int6 } func (l *Logic) doFollow(ctx context.Context, jid string, instance *models.Instance, activity fedihelper.Activity) error { + ctx, tracer := l.tracer.Start(ctx, "doFollow", trace.WithSpanKind(trace.SpanKindInternal)) + defer tracer.End() + log := logger.WithFields(logrus.Fields{ "func": "doFollow", "jid": jid, @@ -186,6 +207,20 @@ func (l *Logic) doFollow(ctx context.Context, jid string, instance *models.Insta return errors.New("activity id is not string") } + // skip already following + if instance.IsFollowing == true { + tracer.SetAttributes(attribute.Bool("skipped", true)) + + return nil + } + + // send accept + outgoingActivity := genActivityAccept(l.domain, instance.ActorIRI, id) + err := l.DeliverActivity(ctx, jid, instance.ID, outgoingActivity, true) + if err != nil { + log.Errorf("enqueueing delivery: %s", err.Error()) + } + // set followed instance.IsFollowing = true @@ -237,13 +272,6 @@ func (l *Logic) doFollow(ctx context.Context, jid string, instance *models.Insta return err } - // send accept - outgoingActivity := genActivityAccept(l.domain, instance.ActorIRI, id) - err = l.runner.EnqueueDeliverActivity(ctx, instance.ID, outgoingActivity) - if err != nil { - log.Errorf("enqueueing delivery: %s", err.Error()) - } - go l.runner.EnqueueSendNotification( ctx, models.EventInstanceFollow, @@ -256,6 +284,9 @@ func (l *Logic) doFollow(ctx context.Context, jid string, instance *models.Insta } func (l *Logic) doForward(ctx context.Context, jid string, instance *models.Instance, activity fedihelper.Activity) error { + ctx, tracer := l.tracer.Start(ctx, "doForward", trace.WithSpanKind(trace.SpanKindInternal)) + defer tracer.End() + log := logger.WithFields(logrus.Fields{ "func": "doForward", "jid": jid, @@ -303,6 +334,9 @@ func (l *Logic) doForward(ctx context.Context, jid string, instance *models.Inst } func (l *Logic) doRelay(ctx context.Context, jid string, instance *models.Instance, activity fedihelper.Activity) error { + ctx, tracer := l.tracer.Start(ctx, "doRelay", trace.WithSpanKind(trace.SpanKindInternal)) + defer tracer.End() + log := logger.WithFields(logrus.Fields{ "func": "doRelay", "jid": jid, @@ -358,6 +392,9 @@ func (l *Logic) doRelay(ctx context.Context, jid string, instance *models.Instan } func (l *Logic) doUndo(ctx context.Context, jid string, instance *models.Instance, activity fedihelper.Activity) error { + ctx, tracer := l.tracer.Start(ctx, "doUndo", trace.WithSpanKind(trace.SpanKindInternal)) + defer tracer.End() + log := logger.WithFields(logrus.Fields{ "func": "doUndo", "jid": jid, @@ -373,6 +410,20 @@ func (l *Logic) doUndo(ctx context.Context, jid string, instance *models.Instanc case fedihelper.TypeAnnounce: return l.doForward(ctx, jid, instance, activity) case fedihelper.TypeFollow: + // skip already following + if instance.IsFollowing == false { + tracer.SetAttributes(attribute.Bool("skipped", true)) + + return nil + } + + // send undo + outgoingActivity := genActivityUndo(l.domain, instance.ActorIRI) + err = l.DeliverActivity(ctx, jid, instance.ID, outgoingActivity, true) + if err != nil { + log.Errorf("enqueueing delivery: %s", err.Error()) + } + instance.IsFollowing = false // create log entries @@ -423,13 +474,6 @@ func (l *Logic) doUndo(ctx context.Context, jid string, instance *models.Instanc return err } - // send undo - outgoingActivity := genActivityUndo(l.domain, instance.ActorIRI) - err = l.runner.EnqueueDeliverActivity(ctx, instance.ID, outgoingActivity) - if err != nil { - log.Errorf("enqueueing delivery: %s", err.Error()) - } - go l.runner.EnqueueSendNotification( ctx, models.EventInstanceUnfollow, diff --git a/internal/runner/faktory/activity.go b/internal/runner/faktory/activity.go index db1b463..c5635e1 100644 --- a/internal/runner/faktory/activity.go +++ b/internal/runner/faktory/activity.go @@ -56,7 +56,7 @@ func (r *Runner) deliverActivity(ctx context.Context, args ...interface{}) error return fmt.Errorf("argument 1 is not an activity") } - return r.logic.DeliverActivity(ctx, help.Jid(), instanceID, activity) + return r.logic.DeliverActivity(ctx, help.Jid(), instanceID, activity, false) } func (r *Runner) EnqueueInboxActivity(_ context.Context, instanceID int64, actorIRI string, activity fedihelper.Activity) error {