mirror of
https://git.ptzo.gdn/feditools/relay.git
synced 2024-09-21 09:27:12 +00:00
worker (#166)
Reviewed-on: https://git.ptzo.gdn/feditools/relay/pulls/166 Co-authored-by: Tyr Mactire <tyr@pettingzoo.co> Co-committed-by: Tyr Mactire <tyr@pettingzoo.co>
This commit is contained in:
parent
daefe097f6
commit
7c1aa35a48
7
cmd/relay/action/worker/logger.go
Normal file
7
cmd/relay/action/worker/logger.go
Normal file
@ -0,0 +1,7 @@
|
||||
package worker
|
||||
|
||||
import "git.ptzo.gdn/feditools/relay/internal/log"
|
||||
|
||||
type empty struct{}
|
||||
|
||||
var logger = log.WithPackageField(empty{})
|
161
cmd/relay/action/worker/start.go
Normal file
161
cmd/relay/action/worker/start.go
Normal file
@ -0,0 +1,161 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"git.ptzo.gdn/feditools/relay/cmd/relay/action"
|
||||
"git.ptzo.gdn/feditools/relay/internal/clock"
|
||||
"git.ptzo.gdn/feditools/relay/internal/config"
|
||||
"git.ptzo.gdn/feditools/relay/internal/db/bun"
|
||||
"git.ptzo.gdn/feditools/relay/internal/db/cachemem"
|
||||
"git.ptzo.gdn/feditools/relay/internal/http"
|
||||
"git.ptzo.gdn/feditools/relay/internal/kv/redis"
|
||||
"git.ptzo.gdn/feditools/relay/internal/logic/logic1"
|
||||
"git.ptzo.gdn/feditools/relay/internal/metrics"
|
||||
"git.ptzo.gdn/feditools/relay/internal/runner/faktory"
|
||||
"git.ptzo.gdn/feditools/relay/internal/token"
|
||||
"github.com/spf13/viper"
|
||||
"github.com/uptrace/uptrace-go/uptrace"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
// Start runs a worker migrations.
|
||||
var Start action.Action = func(ctx context.Context) error {
|
||||
l := logger.WithField("func", "Start")
|
||||
l.Info("starting")
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
// Configure OpenTelemetry with sensible defaults.
|
||||
uptrace.ConfigureOpentelemetry(
|
||||
// copy your project DSN here or use UPTRACE_DSN env var
|
||||
//uptrace.WithDSN("https://<token>@uptrace.dev/<project_id>"),
|
||||
|
||||
uptrace.WithServiceName(viper.GetString(config.Keys.ApplicationName)),
|
||||
uptrace.WithServiceVersion(viper.GetString(config.Keys.SoftwareVersion)),
|
||||
)
|
||||
// Send buffered spans and free resources.
|
||||
defer func() {
|
||||
l.Info("closing uptrace")
|
||||
err := uptrace.Shutdown(context.Background())
|
||||
if err != nil {
|
||||
l.Errorf("closing uptrace: %s", err.Error())
|
||||
}
|
||||
}()
|
||||
|
||||
// create metrics server
|
||||
metricsServer := metrics.New(viper.GetString(config.Keys.MetricsHTTPBind))
|
||||
|
||||
// create clock module
|
||||
l.Debug("creating clock")
|
||||
clockMod := clock.NewClock()
|
||||
|
||||
// create database client
|
||||
l.Debug("creating database client")
|
||||
dbClient, err := bun.New(ctx)
|
||||
if err != nil {
|
||||
l.Errorf("db: %s", err.Error())
|
||||
cancel()
|
||||
|
||||
return err
|
||||
}
|
||||
dbCacheClient, err := cachemem.New(ctx, dbClient)
|
||||
if err != nil {
|
||||
l.Errorf("db: %s", err.Error())
|
||||
cancel()
|
||||
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
err := dbCacheClient.Close(ctx)
|
||||
if err != nil {
|
||||
l.Errorf("closing db: %s", err.Error())
|
||||
}
|
||||
}()
|
||||
|
||||
// create http client
|
||||
httpClient, err := http.NewClient(ctx)
|
||||
if err != nil {
|
||||
l.Errorf("http client: %s", err.Error())
|
||||
cancel()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// create kv client
|
||||
kvClient, err := redis.New(ctx)
|
||||
if err != nil {
|
||||
l.Errorf("redis: %s", err.Error())
|
||||
cancel()
|
||||
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
err := kvClient.Close(ctx)
|
||||
if err != nil {
|
||||
l.Errorf("closing redis: %s", err.Error())
|
||||
}
|
||||
}()
|
||||
|
||||
// create tokenizer
|
||||
tokz, err := token.New()
|
||||
if err != nil {
|
||||
l.Errorf("create tokenizer: %s", err.Error())
|
||||
cancel()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// create logic module
|
||||
l.Debug("creating logic module")
|
||||
logicMod, err := logic1.New(ctx, clockMod, dbCacheClient, httpClient, kvClient, tokz)
|
||||
if err != nil {
|
||||
l.Errorf("logic: %s", err.Error())
|
||||
cancel()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// create runner
|
||||
runnerMod, err := faktory.New(logicMod)
|
||||
if err != nil {
|
||||
l.Errorf("runner: %s", err.Error())
|
||||
cancel()
|
||||
|
||||
return err
|
||||
}
|
||||
logicMod.SetRunner(runnerMod)
|
||||
runnerMod.Start(ctx)
|
||||
|
||||
// ** start application **
|
||||
errChan := make(chan error)
|
||||
|
||||
// Wait for SIGINT and SIGTERM (HIT CTRL-C)
|
||||
stopSigChan := make(chan os.Signal)
|
||||
signal.Notify(stopSigChan, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
// start metrics server
|
||||
go func(m *metrics.Module, errChan chan error) {
|
||||
l.Debug("starting metrics server")
|
||||
err := m.Start()
|
||||
if err != nil {
|
||||
errChan <- fmt.Errorf("metrics server: %s", err.Error())
|
||||
}
|
||||
}(metricsServer, errChan)
|
||||
|
||||
// wait for event
|
||||
select {
|
||||
case sig := <-stopSigChan:
|
||||
l.Infof("got sig: %s", sig)
|
||||
cancel()
|
||||
case err := <-errChan:
|
||||
l.Fatal(err.Error())
|
||||
cancel()
|
||||
}
|
||||
|
||||
<-ctx.Done()
|
||||
l.Infof("done")
|
||||
return nil
|
||||
}
|
20
cmd/relay/flag/worker.go
Normal file
20
cmd/relay/flag/worker.go
Normal file
@ -0,0 +1,20 @@
|
||||
package flag
|
||||
|
||||
import (
|
||||
"git.ptzo.gdn/feditools/relay/internal/config"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
// Worker adds all flags for running the server.
|
||||
func Worker(cmd *cobra.Command, values config.Values) {
|
||||
Redis(cmd, values)
|
||||
|
||||
// server
|
||||
cmd.PersistentFlags().String(config.Keys.ServerExternalHostname, values.ServerExternalHostname, usage.ServerExternalHostname)
|
||||
|
||||
// runner
|
||||
cmd.PersistentFlags().Int(config.Keys.RunnerConcurrency, values.RunnerConcurrency, usage.RunnerConcurrency)
|
||||
|
||||
// metrics
|
||||
cmd.PersistentFlags().String(config.Keys.MetricsHTTPBind, values.MetricsHTTPBind, usage.MetricsHTTPBind)
|
||||
}
|
@ -49,8 +49,9 @@ func main() {
|
||||
|
||||
// add commands
|
||||
rootCmd.AddCommand(accountCommands())
|
||||
rootCmd.AddCommand(serverCommands())
|
||||
rootCmd.AddCommand(databaseCommands())
|
||||
rootCmd.AddCommand(serverCommands())
|
||||
rootCmd.AddCommand(workerCommands())
|
||||
|
||||
err = rootCmd.Execute()
|
||||
if err != nil {
|
||||
|
33
cmd/relay/worker.go
Normal file
33
cmd/relay/worker.go
Normal file
@ -0,0 +1,33 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"git.ptzo.gdn/feditools/relay/cmd/relay/action/worker"
|
||||
"git.ptzo.gdn/feditools/relay/cmd/relay/flag"
|
||||
"git.ptzo.gdn/feditools/relay/internal/config"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
// workerCommands returns the 'worker' subcommand
|
||||
func workerCommands() *cobra.Command {
|
||||
workerCmd := &cobra.Command{
|
||||
Use: "worker",
|
||||
Short: "controls a relay worker",
|
||||
}
|
||||
|
||||
workerStartCmd := &cobra.Command{
|
||||
Use: "start",
|
||||
Short: "start the relay worker",
|
||||
PreRunE: func(cmd *cobra.Command, args []string) error {
|
||||
return preRun(cmd)
|
||||
},
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
return run(cmd.Context(), worker.Start)
|
||||
},
|
||||
}
|
||||
|
||||
flag.Worker(workerStartCmd, config.Defaults)
|
||||
|
||||
workerCmd.AddCommand(workerStartCmd)
|
||||
|
||||
return workerCmd
|
||||
}
|
Loading…
Reference in New Issue
Block a user