diff --git a/cmd/relay/action/worker/logger.go b/cmd/relay/action/worker/logger.go new file mode 100644 index 0000000..71974b7 --- /dev/null +++ b/cmd/relay/action/worker/logger.go @@ -0,0 +1,7 @@ +package worker + +import "git.ptzo.gdn/feditools/relay/internal/log" + +type empty struct{} + +var logger = log.WithPackageField(empty{}) diff --git a/cmd/relay/action/worker/start.go b/cmd/relay/action/worker/start.go new file mode 100644 index 0000000..f2a20c6 --- /dev/null +++ b/cmd/relay/action/worker/start.go @@ -0,0 +1,161 @@ +package worker + +import ( + "context" + "fmt" + "git.ptzo.gdn/feditools/relay/cmd/relay/action" + "git.ptzo.gdn/feditools/relay/internal/clock" + "git.ptzo.gdn/feditools/relay/internal/config" + "git.ptzo.gdn/feditools/relay/internal/db/bun" + "git.ptzo.gdn/feditools/relay/internal/db/cachemem" + "git.ptzo.gdn/feditools/relay/internal/http" + "git.ptzo.gdn/feditools/relay/internal/kv/redis" + "git.ptzo.gdn/feditools/relay/internal/logic/logic1" + "git.ptzo.gdn/feditools/relay/internal/metrics" + "git.ptzo.gdn/feditools/relay/internal/runner/faktory" + "git.ptzo.gdn/feditools/relay/internal/token" + "github.com/spf13/viper" + "github.com/uptrace/uptrace-go/uptrace" + "os" + "os/signal" + "syscall" +) + +// Start runs a worker migrations. +var Start action.Action = func(ctx context.Context) error { + l := logger.WithField("func", "Start") + l.Info("starting") + + ctx, cancel := context.WithCancel(ctx) + + // Configure OpenTelemetry with sensible defaults. + uptrace.ConfigureOpentelemetry( + // copy your project DSN here or use UPTRACE_DSN env var + //uptrace.WithDSN("https://@uptrace.dev/"), + + uptrace.WithServiceName(viper.GetString(config.Keys.ApplicationName)), + uptrace.WithServiceVersion(viper.GetString(config.Keys.SoftwareVersion)), + ) + // Send buffered spans and free resources. + defer func() { + l.Info("closing uptrace") + err := uptrace.Shutdown(context.Background()) + if err != nil { + l.Errorf("closing uptrace: %s", err.Error()) + } + }() + + // create metrics server + metricsServer := metrics.New(viper.GetString(config.Keys.MetricsHTTPBind)) + + // create clock module + l.Debug("creating clock") + clockMod := clock.NewClock() + + // create database client + l.Debug("creating database client") + dbClient, err := bun.New(ctx) + if err != nil { + l.Errorf("db: %s", err.Error()) + cancel() + + return err + } + dbCacheClient, err := cachemem.New(ctx, dbClient) + if err != nil { + l.Errorf("db: %s", err.Error()) + cancel() + + return err + } + defer func() { + err := dbCacheClient.Close(ctx) + if err != nil { + l.Errorf("closing db: %s", err.Error()) + } + }() + + // create http client + httpClient, err := http.NewClient(ctx) + if err != nil { + l.Errorf("http client: %s", err.Error()) + cancel() + + return err + } + + // create kv client + kvClient, err := redis.New(ctx) + if err != nil { + l.Errorf("redis: %s", err.Error()) + cancel() + + return err + } + defer func() { + err := kvClient.Close(ctx) + if err != nil { + l.Errorf("closing redis: %s", err.Error()) + } + }() + + // create tokenizer + tokz, err := token.New() + if err != nil { + l.Errorf("create tokenizer: %s", err.Error()) + cancel() + + return err + } + + // create logic module + l.Debug("creating logic module") + logicMod, err := logic1.New(ctx, clockMod, dbCacheClient, httpClient, kvClient, tokz) + if err != nil { + l.Errorf("logic: %s", err.Error()) + cancel() + + return err + } + + // create runner + runnerMod, err := faktory.New(logicMod) + if err != nil { + l.Errorf("runner: %s", err.Error()) + cancel() + + return err + } + logicMod.SetRunner(runnerMod) + runnerMod.Start(ctx) + + // ** start application ** + errChan := make(chan error) + + // Wait for SIGINT and SIGTERM (HIT CTRL-C) + stopSigChan := make(chan os.Signal) + signal.Notify(stopSigChan, syscall.SIGINT, syscall.SIGTERM) + + // start metrics server + go func(m *metrics.Module, errChan chan error) { + l.Debug("starting metrics server") + err := m.Start() + if err != nil { + errChan <- fmt.Errorf("metrics server: %s", err.Error()) + } + }(metricsServer, errChan) + + // wait for event + select { + case sig := <-stopSigChan: + l.Infof("got sig: %s", sig) + cancel() + case err := <-errChan: + l.Fatal(err.Error()) + cancel() + } + + <-ctx.Done() + l.Infof("done") + return nil +} diff --git a/cmd/relay/flag/worker.go b/cmd/relay/flag/worker.go new file mode 100644 index 0000000..808b615 --- /dev/null +++ b/cmd/relay/flag/worker.go @@ -0,0 +1,20 @@ +package flag + +import ( + "git.ptzo.gdn/feditools/relay/internal/config" + "github.com/spf13/cobra" +) + +// Worker adds all flags for running the server. +func Worker(cmd *cobra.Command, values config.Values) { + Redis(cmd, values) + + // server + cmd.PersistentFlags().String(config.Keys.ServerExternalHostname, values.ServerExternalHostname, usage.ServerExternalHostname) + + // runner + cmd.PersistentFlags().Int(config.Keys.RunnerConcurrency, values.RunnerConcurrency, usage.RunnerConcurrency) + + // metrics + cmd.PersistentFlags().String(config.Keys.MetricsHTTPBind, values.MetricsHTTPBind, usage.MetricsHTTPBind) +} diff --git a/cmd/relay/main.go b/cmd/relay/main.go index 21a93ba..1f1d218 100644 --- a/cmd/relay/main.go +++ b/cmd/relay/main.go @@ -49,8 +49,9 @@ func main() { // add commands rootCmd.AddCommand(accountCommands()) - rootCmd.AddCommand(serverCommands()) rootCmd.AddCommand(databaseCommands()) + rootCmd.AddCommand(serverCommands()) + rootCmd.AddCommand(workerCommands()) err = rootCmd.Execute() if err != nil { diff --git a/cmd/relay/worker.go b/cmd/relay/worker.go new file mode 100644 index 0000000..db58459 --- /dev/null +++ b/cmd/relay/worker.go @@ -0,0 +1,33 @@ +package main + +import ( + "git.ptzo.gdn/feditools/relay/cmd/relay/action/worker" + "git.ptzo.gdn/feditools/relay/cmd/relay/flag" + "git.ptzo.gdn/feditools/relay/internal/config" + "github.com/spf13/cobra" +) + +// workerCommands returns the 'worker' subcommand +func workerCommands() *cobra.Command { + workerCmd := &cobra.Command{ + Use: "worker", + Short: "controls a relay worker", + } + + workerStartCmd := &cobra.Command{ + Use: "start", + Short: "start the relay worker", + PreRunE: func(cmd *cobra.Command, args []string) error { + return preRun(cmd) + }, + RunE: func(cmd *cobra.Command, args []string) error { + return run(cmd.Context(), worker.Start) + }, + } + + flag.Worker(workerStartCmd, config.Defaults) + + workerCmd.AddCommand(workerStartCmd) + + return workerCmd +}