Added chat microservice, removed air

This commit is contained in:
2022-10-20 18:45:58 +02:00
parent bdd1a655d4
commit bff6cdd434
24 changed files with 424 additions and 159 deletions

View File

@@ -1,15 +1,43 @@
package app
import (
"net/http"
"twitch-clone/chat-service/logic"
"twitch-clone/chat-service/models"
"twitch-clone/chat-service/repository"
"twitch-clone/chat-service/repository/scylla"
service "twitch-clone/chat-service/services"
"github.com/joho/godotenv"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
)
func Run() {
err := godotenv.Load()
if err != nil {
panic(err)
}
e := echo.New()
e.GET("/", func(c echo.Context) error {
return c.String(http.StatusOK, "Hello, World!")
var c repository.ChatRepository = scylla.NewChatRepository()
var s service.ChatService = logic.NewChatService(c)
e.Use(middleware.Logger())
e.Use(middleware.Recover())
e.GET("*", func(c echo.Context) error {
return s.Subscribe(c.Response().Writer, c.Request())
})
e.POST("*", func(c echo.Context) error {
return s.Publish(c.Request().URL.Path, &models.ChatMessage{
FromUserID: 0,
FromUser: "niku",
ToUserID: 0,
ToUser: "niku",
Content: "Welcome",
})
})
e.Logger.Fatal(e.Start(":1323"))
}

View File

@@ -0,0 +1,70 @@
package logic
import (
"encoding/json"
"net/http"
"time"
"twitch-clone/chat-service/models"
"twitch-clone/chat-service/repository"
"twitch-clone/chat-service/serializer"
"github.com/bwmarrin/snowflake"
"github.com/olahol/melody"
)
type chatService struct {
ChatRepo repository.ChatRepository
Melody melody.Melody
MsgSerializer serializer.JsonMessageSerializer
Snowflake snowflake.Node
}
func NewChatService(chatRepo repository.ChatRepository) *chatService {
flakeGen, err := snowflake.NewNode(0)
if err != nil {
panic(err)
}
c := &chatService{
ChatRepo: chatRepo,
Melody: *melody.New(),
MsgSerializer: serializer.JsonMessageSerializer{},
Snowflake: *flakeGen,
}
c.Melody.HandleMessage(func(s *melody.Session, b []byte) {
msg, err := c.MsgSerializer.Decode(b)
if err != nil {
bytes, _ := json.Marshal(err.Error())
s.Write(bytes)
return
}
c.Publish(s.Request.URL.Path, msg)
})
return c
}
func (c *chatService) Subscribe(w http.ResponseWriter, r *http.Request) error {
return c.Melody.HandleRequest(w, r)
}
func (c *chatService) Publish(namespace string, msg *models.ChatMessage) error {
msg.MessageID = c.Snowflake.Generate().Int64()
msg.CreatedAt = time.Now().Unix()
rawMsg, err := c.MsgSerializer.Encode(msg)
if err != nil {
return err
}
err = c.Melody.BroadcastFilter(rawMsg, func(q *melody.Session) bool {
return q.Request.URL.Path == namespace
})
if err != nil {
return err
}
err = c.ChatRepo.Store(msg)
return err
}

View File

@@ -0,0 +1,11 @@
package models
type ChatMessage struct {
MessageID int64 `json:"messageId,omitempty"`
FromUserID int64 `validate:"required" json:"fromUserID"`
FromUser string `validate:"required" json:"fromUser"`
ToUserID int64 `validate:"required" json:"toUserID"`
ToUser string `validate:"required" json:"toUser"`
Content string `validate:"required" json:"content"`
CreatedAt int64 `json:"createdAt,omitempty"`
}

View File

@@ -0,0 +1,9 @@
package repository
import (
"twitch-clone/chat-service/models"
)
type ChatRepository interface {
Store(*models.ChatMessage) error
}

View File

@@ -0,0 +1,48 @@
package scylla
import (
"os"
"twitch-clone/chat-service/models"
"github.com/gocql/gocql"
"github.com/scylladb/gocqlx/v2"
"github.com/scylladb/gocqlx/v2/table"
)
var messageMetadata = table.Metadata{
Name: "chat_service.messages",
Columns: []string{"message_id", "from_user_id", "from_user", "to_user_id", "to_user", "content", "created_at"},
}
var messageTable = table.New(messageMetadata)
type ChatRepository struct {
cluster gocql.ClusterConfig
}
func (r *ChatRepository) Store(msg *models.ChatMessage) error {
session, err := gocqlx.WrapSession(r.cluster.CreateSession())
if err != nil {
return err
}
defer session.Close()
q := session.Query(messageTable.Insert()).BindStruct(msg)
if err := q.ExecRelease(); err != nil {
return err
}
return nil
}
func NewChatRepository() *ChatRepository {
cluster := gocql.NewCluster(os.Getenv("CHAT_SCYLLA_HOSTS"))
session, _ := gocqlx.WrapSession(cluster.CreateSession())
Seed(session)
session.Close()
return &ChatRepository{
cluster: *cluster,
}
}

View File

@@ -0,0 +1,30 @@
package scylla
import "github.com/scylladb/gocqlx/v2"
func Seed(session gocqlx.Session) error {
err := session.ExecStmt(`
CREATE KEYSPACE IF NOT EXISTS chat_service
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}`)
if err != nil {
return err
}
err = session.ExecStmt(`
CREATE TABLE IF NOT EXISTS chat_service.messages (
message_id bigint,
from_user_id bigint,
from_user text,
to_user_id bigint,
to_user text,
content text,
created_at timestamp,
PRIMARY KEY (to_user_id, message_id)
)`)
if err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,31 @@
package serializer
import (
"encoding/json"
"twitch-clone/chat-service/models"
"twitch-clone/chat-service/utils"
)
type JsonMessageSerializer struct{}
func (s *JsonMessageSerializer) Decode(input []byte) (*models.ChatMessage, error) {
msg := &models.ChatMessage{}
if err := json.Unmarshal(input, msg); err != nil {
return nil, err
}
if err := utils.Validate.Struct(msg); err != nil {
return nil, err
}
return msg, nil
}
func (s *JsonMessageSerializer) Encode(input *models.ChatMessage) ([]byte, error) {
msg, err := json.Marshal(input)
if err != nil {
return nil, err
}
return msg, nil
}

View File

@@ -0,0 +1,8 @@
package serializer
import "twitch-clone/chat-service/models"
type MessageSerializer interface {
Decode(input []byte) (*models.ChatMessage, error)
Encode(input *models.ChatMessage) ([]byte, error)
}

View File

@@ -0,0 +1,11 @@
package service
import (
"net/http"
"twitch-clone/chat-service/models"
)
type ChatService interface {
Subscribe(http.ResponseWriter, *http.Request) error
Publish(string, *models.ChatMessage) error
}

View File

@@ -0,0 +1,5 @@
package utils
import "github.com/go-playground/validator/v10"
var Validate = validator.New()