Combine server into single 'git-style' executable

This commit is contained in:
2013-09-02 09:36:03 -04:00
parent a6c296e0fd
commit 2599717d09
9 changed files with 43 additions and 38 deletions

85
asinkd/admin_rpc.go Normal file
View File

@ -0,0 +1,85 @@
package main
import (
"net"
"net/http"
"net/rpc"
)
type UserModifier struct {
adb *AsinkDB
}
type UserModifierArgs struct {
Current *User
Updated *User
UpdateLogin bool
UpdateRole bool
UpdatePassword bool
}
func (u *UserModifier) AddUser(user *User, result *int) error {
err := u.adb.DatabaseAddUser(user)
if err != nil {
*result = 1
} else {
*result = 0
}
return err
}
func (u *UserModifier) ModifyUser(args *UserModifierArgs, result *int) error {
currentUser, err := u.adb.DatabaseGetUser(args.Current.Username)
if err != nil {
*result = 1
return err
}
if args.UpdateLogin {
currentUser.Username = args.Updated.Username
}
if args.UpdateRole {
currentUser.Role = args.Updated.Role
}
if args.UpdatePassword {
currentUser.PWHash = args.Updated.PWHash
}
err = u.adb.DatabaseUpdateUser(currentUser)
if err != nil {
*result = 1
return err
}
*result = 0
return nil
}
func (u *UserModifier) RemoveUser(user *User, result *int) error {
err := u.adb.DatabaseDeleteUser(user)
if err != nil {
*result = 1
} else {
*result = 0
}
return err
}
func StartRPC(tornDown chan int, adb *AsinkDB) {
defer func() { tornDown <- 0 }() //the main thread waits for this to ensure the socket is closed
usermod := new(UserModifier)
usermod.adb = adb
rpc.Register(usermod)
rpc.HandleHTTP()
l, err := net.Listen("unix", "/tmp/asink.sock")
if err != nil {
panic(err)
}
defer l.Close()
go http.Serve(l, nil)
WaitOnExit()
}

261
asinkd/database.go Normal file
View File

@ -0,0 +1,261 @@
package main
import (
"asink"
"database/sql"
"errors"
_ "github.com/mattn/go-sqlite3"
"sync"
)
type AsinkDB struct {
db *sql.DB
lock sync.Mutex
}
var DuplicateUsernameErr = errors.New("Username already exists")
var NoUserErr = errors.New("User doesn't exist")
func GetAndInitDB() (*AsinkDB, error) {
dbLocation := "asink-server.db" //TODO make me configurable
db, err := sql.Open("sqlite3", "file:"+dbLocation+"?cache=shared&mode=rwc")
if err != nil {
return nil, err
}
//make sure all the tables are created
tx, err := db.Begin()
if err != nil {
return nil, err
}
rows, err := tx.Query("SELECT name FROM sqlite_master WHERE type='table' AND name='events';")
if err != nil {
return nil, err
}
if !rows.Next() {
//if this is false, it means no rows were returned
tx.Exec("CREATE TABLE events (id INTEGER PRIMARY KEY ASC, userid INTEGER, type INTEGER, path TEXT, hash TEXT, predecessor TEXT, timestamp INTEGER, permissions INTEGER);")
tx.Exec("CREATE INDEX IF NOT EXISTS pathidx on events (path);")
tx.Exec("CREATE INDEX IF NOT EXISTS timestampidx on events (timestamp);")
} else {
rows.Close()
}
rows, err = tx.Query("SELECT name FROM sqlite_master WHERE type='table' AND name='users';")
if err != nil {
return nil, err
}
if !rows.Next() {
//if this is false, it means no rows were returned
tx.Exec("CREATE TABLE users (id INTEGER PRIMARY KEY ASC, username TEXT, pwhash TEXT, role INTEGER);")
} else {
rows.Close()
}
err = tx.Commit()
if err != nil {
return nil, err
}
ret := new(AsinkDB)
ret.db = db
return ret, nil
}
func (adb *AsinkDB) DatabaseAddEvent(u *User, e *asink.Event) (err error) {
adb.lock.Lock()
tx, err := adb.db.Begin()
if err != nil {
return err
}
//make sure the transaction gets rolled back on error, and the database gets unlocked
defer func() {
if err != nil {
tx.Rollback()
}
adb.lock.Unlock()
}()
result, err := tx.Exec("INSERT INTO events (userid, type, path, hash, predecessor, timestamp, permissions) VALUES (?,?,?,?,?,?,?);", u.Id, e.Type, e.Path, e.Hash, e.Predecessor, e.Timestamp, e.Permissions)
if err != nil {
return err
}
id, err := result.LastInsertId()
if err != nil {
return err
}
err = tx.Commit()
if err != nil {
return err
}
e.Id = id
e.InDB = true
return nil
}
func (adb *AsinkDB) DatabaseRetrieveEvents(firstId uint64, maxEvents uint, u *User) (events []*asink.Event, err error) {
adb.lock.Lock()
//make sure the database gets unlocked on return
defer func() {
adb.lock.Unlock()
}()
rows, err := adb.db.Query("SELECT id, type, path, hash, predecessor, timestamp, permissions FROM events WHERE userid = ? AND id >= ? ORDER BY id ASC LIMIT ?;", u.Id, firstId, maxEvents)
if err != nil {
return nil, err
}
for rows.Next() {
var event asink.Event
err = rows.Scan(&event.Id, &event.Type, &event.Path, &event.Hash, &event.Predecessor, &event.Timestamp, &event.Permissions)
if err != nil {
return nil, err
}
events = append(events, &event)
}
return events, nil
}
func (adb *AsinkDB) DatabaseAddUser(u *User) (err error) {
adb.lock.Lock()
tx, err := adb.db.Begin()
if err != nil {
return err
}
//make sure the transaction gets rolled back on error, and the database gets unlocked
defer func() {
if err != nil {
tx.Rollback()
}
adb.lock.Unlock()
}()
//make sure the username we're switching to doesn't already exist in the database
existingUsername := ""
row := tx.QueryRow("SELECT username FROM users WHERE username == ?;", u.Username)
err = row.Scan(&existingUsername)
switch {
case err == sql.ErrNoRows:
//keep going
case err != nil:
return err
default:
return DuplicateUsernameErr
}
result, err := tx.Exec("INSERT INTO users (username, pwhash, role) VALUES (?,?,?);", u.Username, u.PWHash, u.Role)
if err != nil {
return err
}
u.Id, err = result.LastInsertId()
if err != nil {
return err
}
err = tx.Commit()
if err != nil {
return err
}
return nil
}
//set attributes for the user with the same Id as *u
func (adb *AsinkDB) DatabaseUpdateUser(u *User) (err error) {
adb.lock.Lock()
tx, err := adb.db.Begin()
if err != nil {
return err
}
//make sure the transaction gets rolled back on error, and the database gets unlocked
defer func() {
if err != nil {
tx.Rollback()
}
adb.lock.Unlock()
}()
//make sure the username we're switching to doesn't already exist in the database
existingUsername := ""
row := tx.QueryRow("SELECT username FROM users WHERE username == ? AND id != ?;", u.Username, u.Id)
err = row.Scan(&existingUsername)
switch {
case err == sql.ErrNoRows:
//keep going
case err != nil:
return err
default:
return DuplicateUsernameErr
}
_, err = tx.Exec("UPDATE users SET username=?, pwhash=?, role=? WHERE id=?;", u.Username, u.PWHash, u.Role, u.Id)
if err != nil {
return err
}
err = tx.Commit()
if err != nil {
return err
}
return nil
}
func (adb *AsinkDB) DatabaseGetUser(username string) (user *User, err error) {
adb.lock.Lock()
//make sure the database gets unlocked
defer adb.lock.Unlock()
row := adb.db.QueryRow("SELECT id, username, pwhash, role FROM users WHERE username == ?;", username)
user = new(User)
err = row.Scan(&user.Id, &user.Username, &user.PWHash, &user.Role)
switch {
case err == sql.ErrNoRows:
return nil, NoUserErr
case err != nil:
return nil, err
default:
return user, nil
}
}
func (adb *AsinkDB) DatabaseDeleteUser(u *User) (err error) {
adb.lock.Lock()
tx, err := adb.db.Begin()
if err != nil {
return err
}
//make sure the transaction gets rolled back on error, and the database gets unlocked
defer func() {
if err != nil {
tx.Rollback()
}
adb.lock.Unlock()
}()
res, err := tx.Exec("DELETE FROM users WHERE username=?;", u.Username)
if err != nil {
return err
}
rows, err := res.RowsAffected()
if err != nil {
return err
}
if rows == 0 {
return NoUserErr
} else if rows > 1 {
return errors.New("Error: attempting to delete user by username, but more than row will be affected: " + u.Username)
}
err = tx.Commit()
if err != nil {
return err
}
return nil
}

43
asinkd/exit.go Normal file
View File

@ -0,0 +1,43 @@
package main
import (
"os"
"os/signal"
"sync/atomic"
)
var exitWaiterCount int32
var exitCalled chan int
var exitWaiterChan chan int
func init() {
exitWaiterCount = 0
exitWaiterChan = make(chan int)
go setupCleanExitOnSignals()
}
func setupCleanExitOnSignals() {
//wait to properly close the socket when we're exiting
exitCode := 0
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt)
defer signal.Stop(sig)
select {
case <-sig:
case exitCode = <-exitCalled:
}
for c := atomic.AddInt32(&exitWaiterCount, -1); c >= 0; c = atomic.AddInt32(&exitWaiterCount, -1) {
exitWaiterChan <- exitCode
}
}
func Exit(exitCode int) {
exitCalled <- exitCode
}
func WaitOnExit() int {
atomic.AddInt32(&exitWaiterCount, 1)
return <-exitWaiterChan
}

76
asinkd/longpolling.go Normal file
View File

@ -0,0 +1,76 @@
package main
import (
"asink"
"sync"
"time"
)
type LongPollGroup struct {
channels []*chan *asink.Event
lock sync.Mutex
}
type PollingManager struct {
lock sync.RWMutex
groups map[int64]*LongPollGroup
}
var pm *PollingManager
func init() {
pm = new(PollingManager)
pm.groups = make(map[int64]*LongPollGroup)
}
func addPoller(uid int64, channel *chan *asink.Event) {
pm.lock.RLock()
group := pm.groups[uid]
if group != nil {
group.lock.Lock()
pm.lock.RUnlock()
group.channels = append(group.channels, channel)
group.lock.Unlock()
} else {
pm.lock.RUnlock()
pm.lock.Lock()
group = new(LongPollGroup)
group.channels = append(group.channels, channel)
pm.groups[uid] = group
pm.lock.Unlock()
}
//set timer to call function after one minute
timeout := time.Duration(1) * time.Minute
time.AfterFunc(timeout, func() {
group.lock.Lock()
for i, c := range group.channels {
if c == channel {
copy(group.channels[i:], group.channels[i+1:])
group.channels = group.channels[:len(group.channels)-1]
break
}
}
group.lock.Unlock()
close(*channel)
})
}
func broadcastToPollers(uid int64, event *asink.Event) {
//store off the long polling group we're trying to send to and remove
//it from PollingManager.groups
pm.lock.Lock()
group := pm.groups[uid]
pm.groups[uid] = nil
pm.lock.Unlock()
//send event down each of group's channels
if group != nil {
group.lock.Lock()
for _, c := range group.channels {
*c <- event
}
group.lock.Unlock()
}
}

53
asinkd/main.go Normal file
View File

@ -0,0 +1,53 @@
package main
import (
"fmt"
"os"
)
type Command struct {
cmd string
fn func(args []string)
explanation string
}
var commands []Command = []Command{
Command{
cmd: "start",
fn: StartServer,
explanation: "Start the server daemon",
},
Command{
cmd: "useradd",
fn: UserAdd,
explanation: "Add a user",
},
Command{
cmd: "userdel",
fn: UserDel,
explanation: "Remove a user",
},
Command{
cmd: "usermod",
fn: UserMod,
explanation: "Modify a user",
},
}
func main() {
if len(os.Args) > 1 {
cmd := os.Args[1]
for _, c := range commands {
if c.cmd == cmd {
c.fn(os.Args[2:])
return
}
}
fmt.Println("Invalid subcommand specified, please pick from the following:")
} else {
fmt.Println("No subcommand specified, please pick one from the following:")
}
for _, c := range commands {
fmt.Printf("\t%s\t\t%s\n", c.cmd, c.explanation)
}
}

27
asinkd/rpc.go Normal file
View File

@ -0,0 +1,27 @@
package main
import (
"log"
"net"
"net/rpc"
"syscall"
)
func RPCCall(method string, args interface{}, reply interface{}) error {
socket := "/tmp/asink.sock"
client, err := rpc.DialHTTP("unix", socket)
if err != nil {
if err2, ok := err.(*net.OpError); ok {
if err2.Err == syscall.ENOENT {
log.Fatal("The socket (" + socket + ") was not found")
} else if err2.Err == syscall.ECONNREFUSED {
log.Fatal("A connection was refused to " + socket + ". Please check the permissions and ensure the server is running.")
}
}
return err
}
defer client.Close()
err = client.Call(method, args, reply)
return err
}

164
asinkd/rpc_users.go Normal file
View File

@ -0,0 +1,164 @@
package main
import (
"code.google.com/p/gopass"
"flag"
"fmt"
"net/rpc"
"os"
"strconv"
)
type boolIsSetFlag struct {
Value bool
IsSet bool //true if explicitly set from the command-line, false otherwise
}
func newBoolIsSetFlag(defaultValue bool) *boolIsSetFlag {
b := new(boolIsSetFlag)
b.Value = defaultValue
return b
}
func (b *boolIsSetFlag) Set(value string) error {
v, err := strconv.ParseBool(value)
b.Value = v
b.IsSet = true
return err
}
func (b *boolIsSetFlag) String() string { return fmt.Sprintf("%v", *b) }
func (b *boolIsSetFlag) IsBoolFlag() bool { return true }
func UserAdd(args []string) {
flags := flag.NewFlagSet("useradd", flag.ExitOnError)
admin := flags.Bool("admin", false, "User should be an administrator")
flags.Parse(args)
if flags.NArg() != 1 {
fmt.Println("Error: please supply a username (and only one)")
os.Exit(1)
}
passwordOne, err := gopass.GetPass("Enter password for new user: ")
if err != nil {
panic(err)
}
passwordTwo, err := gopass.GetPass("Enter the same password again: ")
if err != nil {
panic(err)
}
if passwordOne != passwordTwo {
fmt.Println("Error: Passwords do not match. Please try again.")
os.Exit(1)
}
user := new(User)
if *admin {
user.Role = ADMIN
} else {
user.Role = NORMAL
}
user.Username = flags.Arg(0)
user.PWHash = HashPassword(passwordOne)
i := 99
err = RPCCall("UserModifier.AddUser", user, &i)
if err != nil {
if _, ok := err.(rpc.ServerError); ok && err.Error() == DuplicateUsernameErr.Error() {
fmt.Println("Error: " + err.Error())
return
}
panic(err)
}
}
func UserDel(args []string) {
if len(args) != 1 {
fmt.Println("Error: please supply a username (and only one)")
os.Exit(1)
}
user := new(User)
user.Username = args[0]
i := 99
err := RPCCall("UserModifier.RemoveUser", user, &i)
if err != nil {
if _, ok := err.(rpc.ServerError); ok && err.Error() == NoUserErr.Error() {
fmt.Println("Error: " + err.Error())
return
}
panic(err)
}
}
func UserMod(args []string) {
rpcargs := new(UserModifierArgs)
rpcargs.Current = new(User)
rpcargs.Updated = new(User)
admin := newBoolIsSetFlag(false)
flags := flag.NewFlagSet("usermod", flag.ExitOnError)
flags.Var(admin, "admin", "User should be an administrator")
flags.BoolVar(&rpcargs.UpdatePassword, "password", false, "Change the user's password")
flags.BoolVar(&rpcargs.UpdatePassword, "p", false, "Change the user's password (short version)")
flags.BoolVar(&rpcargs.UpdateLogin, "login", false, "Change the user's username")
flags.BoolVar(&rpcargs.UpdateLogin, "l", false, "Change the user's username (short version)")
flags.Parse(args)
if flags.NArg() != 1 {
fmt.Println("Error: please supply a username (and only one)")
os.Exit(1)
}
rpcargs.Current.Username = flags.Arg(0)
if rpcargs.UpdateLogin == true {
fmt.Print("New login: ")
fmt.Scanf("%s", &rpcargs.Updated.Username)
}
if rpcargs.UpdatePassword {
passwordOne, err := gopass.GetPass("Enter new password for user: ")
if err != nil {
panic(err)
}
passwordTwo, err := gopass.GetPass("Enter the same password again: ")
if err != nil {
panic(err)
}
if passwordOne != passwordTwo {
fmt.Println("Error: Passwords do not match. Please try again.")
os.Exit(1)
}
rpcargs.Updated.PWHash = HashPassword(passwordOne)
}
//set the UpdateRole flag based on whether it was present on the command-line
rpcargs.UpdateRole = admin.IsSet
if admin.Value {
rpcargs.Updated.Role = ADMIN
} else {
rpcargs.Updated.Role = NORMAL
}
if !rpcargs.UpdateRole && !rpcargs.UpdateLogin && !rpcargs.UpdatePassword {
fmt.Println("What exactly are you modifying again?")
return
}
i := 99
err := RPCCall("UserModifier.ModifyUser", rpcargs, &i)
if err != nil {
if _, ok := err.(rpc.ServerError); ok && err.Error() == NoUserErr.Error() {
fmt.Println("Error: " + err.Error())
return
}
panic(err)
}
}

225
asinkd/server.go Normal file
View File

@ -0,0 +1,225 @@
package main
import (
"asink"
"encoding/base64"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"net"
"net/http"
"regexp"
"strconv"
"strings"
)
//global variables
var eventsRegexp *regexp.Regexp
var port int = 8080
var adb *AsinkDB
func init() {
var err error
eventsRegexp = regexp.MustCompile("^/events/([0-9]+)$")
adb, err = GetAndInitDB()
if err != nil {
panic(err)
}
}
func StartServer(args []string) {
const port_usage = "Port on which to serve HTTP API"
flags := flag.NewFlagSet("start", flag.ExitOnError)
flags.IntVar(&port, "port", 8080, port_usage)
flags.IntVar(&port, "p", 8080, port_usage+" (shorthand)")
flags.Parse(args)
rpcTornDown := make(chan int)
go StartRPC(rpcTornDown, adb)
http.HandleFunc("/", rootHandler)
http.HandleFunc("/events", eventHandler)
http.HandleFunc("/events/", eventHandler)
//TODO add HTTPS, something like http://golang.org/pkg/net/http/#ListenAndServeTLS
l, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
panic(err)
}
defer l.Close()
go http.Serve(l, nil)
//TODO handle errors from http.Serve?
WaitOnExit()
<-rpcTornDown
}
func rootHandler(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "You're probably looking for /events/")
}
func getEvents(w http.ResponseWriter, r *http.Request, user *User, nextEvent uint64) {
var events []*asink.Event
var error_message string = ""
defer func() {
var apiresponse asink.APIResponse
if error_message != "" {
apiresponse = asink.APIResponse{
Status: asink.ERROR,
Explanation: error_message,
}
} else {
apiresponse = asink.APIResponse{
Status: asink.SUCCESS,
Events: events,
}
}
b, err := json.Marshal(apiresponse)
if err != nil {
error_message = err.Error()
return
}
w.Write(b)
}()
events, err := adb.DatabaseRetrieveEvents(nextEvent, 50, user)
if err != nil {
error_message = err.Error()
return
}
//long-poll if events is empty
if len(events) == 0 {
c := make(chan *asink.Event)
addPoller(user.Id, &c) //TODO support more than one share per user
e, ok := <-c
if ok {
events = append(events, e)
}
}
}
func putEvents(w http.ResponseWriter, r *http.Request, user *User) {
var events asink.EventList
var error_message string = ""
defer func() {
var apiresponse asink.APIResponse
if error_message != "" {
apiresponse = asink.APIResponse{
Status: asink.ERROR,
Explanation: error_message,
}
} else {
apiresponse = asink.APIResponse{
Status: asink.SUCCESS,
}
}
b, err := json.Marshal(apiresponse)
if err != nil {
error_message = err.Error()
return
}
w.Write(b)
}()
body, err := ioutil.ReadAll(r.Body)
if err != nil {
error_message = err.Error()
return
}
err = json.Unmarshal(body, &events)
if err != nil {
error_message = err.Error()
return
}
for _, event := range events.Events {
err = adb.DatabaseAddEvent(user, event)
if err != nil {
//TODO should probably do this in a way that the caller knows how many of these have failed and doesn't re-try sending ones that succeeded
//i.e. add this to the return codes or something
//OR put all the DatabaseAddEvent's inside a SQL transaction, and rollback on any failure
error_message = err.Error()
return
}
}
broadcastToPollers(user.Id, events.Events[0]) //TODO support more than one user
}
func eventHandler(w http.ResponseWriter, r *http.Request) {
user := AuthenticateUser(r)
if user == nil {
w.Header().Set("WWW-Authenticate", "Basic realm=\"Asink Server\"")
apiresponse := asink.APIResponse{
Status: asink.ERROR,
Explanation: "This operation requires user authentication",
}
b, err := json.Marshal(apiresponse)
if err != nil {
b = []byte(err.Error())
}
w.WriteHeader(401)
w.Write(b)
return
}
if r.Method == "GET" {
//if GET, return any events later than (and including) the event id passed in
if sm := eventsRegexp.FindStringSubmatch(r.RequestURI); sm != nil {
i, err := strconv.ParseUint(sm[1], 10, 64)
if err != nil {
//TODO display error message here instead
fmt.Printf("ERROR parsing " + sm[1] + "\n")
getEvents(w, r, user, 0)
} else {
getEvents(w, r, user, i)
}
} else {
getEvents(w, r, user, 0)
}
} else if r.Method == "POST" {
putEvents(w, r, user)
} else {
apiresponse := asink.APIResponse{
Status: asink.ERROR,
Explanation: "Invalid HTTP method - only GET and POST are supported on this endpoint.",
}
b, _ := json.Marshal(apiresponse)
w.Write(b)
}
}
func AuthenticateUser(r *http.Request) (user *User) {
h, ok := r.Header["Authorization"]
if !ok {
return nil
}
authparts := strings.Split(h[0], " ")
if len(authparts) != 2 || authparts[0] != "Basic" {
return nil
}
userpass, err := base64.StdEncoding.DecodeString(authparts[1])
if err != nil {
return nil
}
splituserpass := strings.Split(string(userpass), ":")
if len(splituserpass) != 2 {
return nil
}
user, err = adb.DatabaseGetUser(splituserpass[0])
if err != nil || user == nil {
return nil
}
if user.ValidPassword(splituserpass[1]) {
return user
} else {
return nil
}
}

35
asinkd/users.go Normal file
View File

@ -0,0 +1,35 @@
package main
import (
"crypto/sha256"
"fmt"
)
type UserRole uint32
const (
//User roles
ADMIN = 1 << iota
NORMAL
)
type User struct {
Id int64
Username string
PWHash string
Role UserRole
}
func HashPassword(pw string) string {
hashfn := sha256.New()
hashfn.Write([]byte(pw))
return fmt.Sprintf("%x", hashfn.Sum(nil))
}
func (u *User) ValidPassword(pw string) bool {
return HashPassword(pw) == u.PWHash
}
func (u *User) IsAdmin() bool {
return u.Role&ADMIN == ADMIN
}