You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

101 lines
2.5 KiB

package shock
import (
"fmt"
"go.etcd.io/bbolt"
)
// Sequencer defines a way to assign a unique sequence ID to an object
type Sequencer interface {
AssignSeq(uint64)
}
// Put adds a new value to a bucket
func (d *DB) Put(bucket string, val Sequencer) error {
return d.Bolt.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(bucket))
seq, err := b.NextSequence()
if err != nil {
return err
}
val.AssignSeq(seq)
serial, err := d.Serializer.Marshal(val)
if err != nil {
return err
}
return b.Put([]byte(fmt.Sprintf("%d", seq)), serial)
})
}
// PutWithKey adds a new value to the bucket with a defined key
func (d *DB) PutWithKey(bucket string, key, val interface{}) error {
return d.Bolt.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(bucket))
serial, err := d.Serializer.Marshal(val)
if err != nil {
return err
}
return b.Put([]byte(fmt.Sprint(key)), serial)
})
}
// Get returns a value from a bucket with the specified sequence ID
func (d *DB) Get(bucket string, id, val interface{}) error {
err := d.Bolt.View(func(tx *bbolt.Tx) error {
serial := tx.Bucket([]byte(bucket)).Get([]byte(fmt.Sprint(id)))
return d.Serializer.Unmarshal(serial, val)
})
return err
}
// Delete removes a value from the bucket with the specified sequence ID
func (d *DB) Delete(bucket string, id interface{}) error {
err := d.Bolt.Update(func(tx *bbolt.Tx) error {
return tx.Bucket([]byte(bucket)).Delete([]byte(fmt.Sprint(id)))
})
return err
}
// Count returns the number of objects in a bucket
func (d *DB) Count(bucket string) (int, error) {
count := 0
err := d.Bolt.View(func(tx *bbolt.Tx) error {
return tx.Bucket([]byte(bucket)).ForEach(func(_, _ []byte) error {
count++
return nil
})
})
return count, err
}
// EachFunc defines a way to interact with each object in a bucket while iterating
type EachFunc func(id, serial []byte) error
// ViewEach iterates over each object in a bucket in read-only mode
func (d *DB) ViewEach(bucket string, fn EachFunc) error {
return d.forEach(bucket, false, fn)
}
// UpdateEach iterates over each object in a bucket in write mode
func (d *DB) UpdateEach(bucket string, fn EachFunc) error {
return d.forEach(bucket, true, fn)
}
func (d *DB) forEach(bucket string, writable bool, fn EachFunc) error {
tx, err := d.Bolt.Begin(writable)
if err != nil {
return err
}
if err := tx.Bucket([]byte(bucket)).ForEach(fn); err != nil {
return err
}
if writable {
return tx.Commit()
}
return tx.Rollback()
}