Merge pull request 'updates on dependencies + multithreading fixes' (#2) from develop into master
All checks were successful
continuous-integration/drone/push Build is passing

Reviewed-on: #2
This commit is contained in:
Paul 2023-09-14 15:42:49 +02:00
commit 995a10ca17
4 changed files with 41 additions and 36 deletions

View File

@ -40,7 +40,7 @@ func main() {
go models.ScanIP(&cfg)
}
if cfg.Switchs.Debug {
if cfg.Options.Debug {
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()

View File

@ -10,7 +10,6 @@ import (
func (cfg *Config) GetConfig() error {
var configfile string
var debug bool
var drop bool
var init bool
var port int
@ -21,7 +20,6 @@ func (cfg *Config) GetConfig() error {
flag.StringVar(&configfile, "configfile", "ipbl.ini", "Configuration file to use with ipbl section")
flag.IntVar(&port, "port", 8099, "Web service port to use")
flag.BoolVar(&debug, "debug", false, "If debug logging must be enabled")
flag.BoolVar(&drop, "drop", false, "If dropping tables must occur")
flag.BoolVar(&init, "init", false, "If init of database must be done")
flag.BoolVar(&version, "version", false, "Show version")
@ -29,7 +27,6 @@ func (cfg *Config) GetConfig() error {
flag.Parse()
cfg.Switchs.Debug = debug
cfg.Switchs.Drop = drop
cfg.Switchs.Init = init
cfg.Switchs.Port = port
@ -50,6 +47,7 @@ func (cfg *Config) GetConfig() error {
cfg.Options.SocketChannel = "ipbl"
cfg.Options.HideBanner = ipblsection.Key("hidebanner").MustBool(false)
cfg.Options.Debug = ipblsection.Key("debug").MustBool(false)
return nil
}
@ -66,11 +64,11 @@ type Config struct {
Version string
HideBanner bool
SocketChannel string
Debug bool
}
Switchs struct {
Port int
NoFeed bool
Debug bool
Drop bool
Init bool
Version bool

View File

@ -30,7 +30,7 @@ func Initialize(ctx *context.Context, cfg *config.Config) (err error) {
}
cfg.Db.SetMapper(names.GonicMapper{})
cfg.Db.SetQuotePolicy(dialects.QuotePolicyReserved)
cfg.Db.ShowSQL(cfg.Switchs.Debug)
cfg.Db.ShowSQL(cfg.Options.Debug)
//cfg.Db.SetDefaultCacher(cacher)

View File

@ -8,6 +8,7 @@ import (
"io"
"log"
"net/http"
"runtime"
"sync"
"time"
@ -131,21 +132,30 @@ func InsertIPBulk(session *xorm.Session, ips *[]IP) (numinsert int64, numupdate
}
func ScanIP(cfg *config.Config) (err error) {
var numthreads = 8
var numthreads int = runtime.NumCPU() / 2
for {
session := cfg.Db.NewSession()
orphans := []IP{}
err = session.Where(`((as_id IS NULL
OR country_id IS NULL
OR city_id IS NULL
OR rdns = ''
OR rdns IS NULL) AND updated < now()-'1d'::interval) OR
(as_id IS NULL
AND country_id IS NULL
AND city_id IS NULL
AND rdns IS NULL)
`).Desc("updated").Limit(SCANLIMIT).Find(&orphans)
session.Close()
err = session.Where(`
(
(
as_id IS NULL
OR country_id IS NULL
OR city_id IS NULL
OR rdns = ''
OR rdns IS NULL
)
AND updated < now()-'1d'::interval
)
OR
(
as_id IS NULL
AND country_id IS NULL
AND city_id IS NULL
AND rdns IS NULL
)
`).Desc("updated").Limit(SCANLIMIT).Find(&orphans)
defer session.Close()
if err == nil && len(orphans) > 0 {
orphanchan := make(chan IP)
@ -161,10 +171,9 @@ func ScanIP(cfg *config.Config) (err error) {
}
close(orphanchan)
<-done
wg.Wait()
} else {
time.Sleep(30 * time.Second)
time.Sleep(1 * time.Second)
}
}
}
@ -172,19 +181,21 @@ func ScanIP(cfg *config.Config) (err error) {
func ScanOrphan(wg *sync.WaitGroup, orphans chan IP, done chan bool, thr int, cfg *config.Config) (err error) {
wg.Add(1)
session := cfg.Db.NewSession()
defer session.Close()
queryclient := http.Client{}
for {
orphan, more := <-orphans
if more {
session := cfg.Db.NewSession()
if err != nil {
log.Println(err)
}
queryclient := http.Client{}
var query QueryIP
query, err := QueryInfo(&queryclient, orphan.IP)
if err != nil {
log.Println(err)
time.Sleep(10 * time.Minute)
time.Sleep(1 * time.Minute)
continue
}
@ -202,8 +213,8 @@ func ScanOrphan(wg *sync.WaitGroup, orphans chan IP, done chan bool, thr int, cf
}
orphan.Rdns = sql.NullString{String: query.Rdns, Valid: true}
if cfg.Switchs.Debug {
log.Printf("%s -> \"%s\"\n", orphan.IP, query.Rdns)
if cfg.Options.Debug {
log.Printf("t%d: %s -> \"%s\"\n", thr, orphan.IP, query.Rdns)
}
_, err = orphan.GetOrCreate(session)
@ -211,21 +222,17 @@ func ScanOrphan(wg *sync.WaitGroup, orphans chan IP, done chan bool, thr int, cf
continue
}
err = session.Commit()
if err != nil {
log.Println(err)
}
err = session.Close()
if err != nil {
log.Println(err)
}
} else {
//fmt.Printf("All orphan migrated on thread num %d\n", thr)
fmt.Printf("All orphan migrated on thread num %d\n", thr)
wg.Done()
done <- true
return nil
break
}
}
err = session.Commit()
if err != nil {
log.Println(err)
}
return nil
}
func QueryInfo(client *http.Client, ip string) (query QueryIP, err error) {